JavaJava 程序员

ReentrantLock 源码分析

2022-05-27  本文已影响0人  马小莫QAQ

ReentrantLock 特征

特点:

  1. 可重入
  2. 公平/非公平
  3. 可中断
  4. 支持条件等待
  5. 可设置锁超时

常用 API

使用例子:

public class ReentrantLockTest {

    static ReentrantLock lock = new ReentrantLock(true);

    static class ClientThread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread() + "开始尝试获取锁");
            lock.lock();
            try {
                System.out.println(Thread.currentThread() + "成功获取锁");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread() + "完成释放锁");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ClientThread t1 = new ClientThread();
        ClientThread t2 = new ClientThread();
        ClientThread t3 = new ClientThread();
        t1.start();
        t2.start();
        t3.start();

        TimeUnit.SECONDS.sleep(10);
    }
}

源码分析

获取锁

如果我使用下面的代码进行获取锁:

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();

ReentrantLock 默认调用的就是非公平锁调用栈:

final void lock() {
    // 直接尝试加锁
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 如果获取锁失败进入 AQS acquire 逻辑
        acquire(1);
}

如果 compareAndSetState(0, 1)能够直接执行成功,那么将直接结束方法的执行。如果失败,那么就会调用acquire 方法如下:

public final void acquire(int arg) {
    // tryAcquire(arg) 尝试获取锁
    // acquireQueued 获取锁失败进行等待队列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我们先看 tryAcquire方法:

他会直接调用到 nonfairTryAcquire非公平锁的加锁逻辑里面有两个逻辑:

// 非公平锁的逻辑
// 如何理解插队, 这里的插队是当前队列中被唤醒的线程, 和当前加入的线程都可以被执行
// 如果当前加入线程比队列中唤醒的线程先获取到锁, 就是插队现象
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 无锁状态, 尝试竞争
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { //是否获取到锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 当前线程持有锁, state 计数 +1
    else if (current == getExclusiveOwnerThread()) { //判断是否是重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

如果 tryAcquire 调用完成后是获取锁成功 ****acquire方法执行结束,最后代表 lock 方法执行结束。

获取锁失败进入同步队列

如果获取锁失败,那么就会执行 acquire代码后面段 if 逻辑的执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
这里其实可以分为两个方法来看

按照执行顺序,我们先看 addWaiter(Node.EXCLUSIVE) 这里主要是入队的逻辑。
addWaiter: java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

private Node addWaiter(Node mode) {
    // 将当前线程转换为 AQS Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 尝试直接加入到尾节点
    Node pred = tail;
    if (pred != null) {
        // 当前节点的前驱节点指向尾节点
        node.prev = pred;
        // cas 修改 tail 节点,如果成功返回 node 
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果失败,调用 enq
    enq(node);
    return node;
}

enq是将当前节点插入队列,必要的时候会进行初始化

//将节点插入队列,必要时进行初始化。
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 如果没有尾节点,那么需要进行初始化
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } 
        // 如果有尾节点/其实就是有头节点/已经被初始化,通过 CAS 入队
        else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

前面我们看看完了,当前获取锁的线程当获取锁失败的时候,成功进入 AQS 队列,接下来我们继续看
acquireQueued又做了什么呢?

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 是否中断
        boolean interrupted = false;
        for (;;) {
            // 获取 node 的前驱节点
            final Node p = node.predecessor();
            // 如果是头节点,再次尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 将 node 设置为 头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断是否需要进行阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 阻塞线程
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 是否失败
        if (failed)
            // 如果失败,取消获取锁
            cancelAcquire(node);
    }
}

上面我们可以看到,for (;;)中有两个判断

在调用 acquireQueued这个过程中可能调用多次 shouldParkAfterFailedAcquire 方法。shouldParkAfterFailedAcquire 会执行一下几个操作。

/** 
 * 当获取锁失败后, 检查更新新节点状态如果是需要阻塞返回, true
 * <p>
 * 一个前继节点 waitStatus = 0, 第一次将继续设置为 SIGNAL, 告诉当前线程准备进入阻塞, 此时依旧获取不到, 当前线程进入阻塞
 *
 * @param pred 前继节点
 * @param node 当前节点
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 前继节点的状态, 第一次进入的话, 一定是 0
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            // 出队, 剔除无效的节点
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 第一次进来, pred.waitStatus = 0 执行这个分支
        // 将前继节点的状态修改为 SIGNAL, 表示 pred.next 节点需要被唤醒(此时准备进入阻塞, 但是还未被阻塞, 再次获取锁失败之后才会被阻塞)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

当 Node 被修改 Node.SIGNAL状态后,第一个 if 返回 true , 我们再次回到 acquireQueued 方法,就会执行 parkAndCheckInterrupt 方法,就是将当前的线程 park 然后返回当前线程的中断状态。

private final boolean parkAndCheckInterrupt() {
    // 阻塞线程
    LockSupport.park(this);
    // 返回线程中断状态
    return Thread.interrupted();
}

注意:这里线程 park 过后,其实获取锁就结束了前半段的操作,完成同步队列的入队,并且进入等待。我们就需要等待解锁唤醒。

释放锁

释放锁的代码如下:

lock.unlock();

释放锁做了什么呢?

调用栈如下:
java.util.concurrent.locks.ReentrantLock#unlock

我们可以从 release方法开始

// 解锁
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 判断是否有需要唤醒的线程
        if (h != null && h.waitStatus != 0) //waitStatus 的值为 0, 只有当后继存在节点才会被设置为该值不为 0, 此时需要唤醒后继线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁,主要是调用 tryRelease, 首先就是考虑之前的重入问题,直接对 state 进行 -1 ,然后如果 c == 0表示当前线程不再持有锁,我们就可以修改 ownerThread == null . 这个时候,最后修改 state 为新值。

// tryRelease 
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 判断是否是当前线程持有锁
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 如果 state == 0 表示当前线程不在占有该锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

释放锁成功后,再次回到 release方法,会再次判断,如果 AQS 队列不为空,那么就进行排队线程唤醒。
主要是调用 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

// 唤醒队列中的线程
private void unparkSuccessor(Node node) {
    // 将当前节点状态修改为 0  
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 反向查找
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒队列中的节点
        LockSupport.unpark(s.thread);
}

其实这里最关键的就是 LockSupport.unpark(s.thread); 这里就会回到 acquireQueued,执行唤醒后强锁的逻辑,依然在 acquireQueued里面。

释放锁后唤醒等待节点

当前节点被唤醒逻辑,首先会在 shouldParkAfterFailedAcquire 方法中出队,然后尝试加锁如果加锁成功就返回 true.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
        * This node has already set status asking a release
        * to signal it, so it can safely park.
        */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        // 出队的逻辑
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

再次竞争锁,主要是在acquireQueued方法中调用 tryAcquire方法进行获取锁。如果获取锁失败,就又再次获取锁,如果获取锁成功返回。

测试和实践

支持锁中断

如果通过 lock.lockInterruptibly();_ 方式加锁,如果当前线程出现中断过后,会抛出 _java.lang.InterruptedException线程中断异常,所以 ReentrantLock支持可中断。
相关源码:

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    // LockSupport.park 会清除中断信号
    LockSupport.park(this);
    return Thread.interrupted();
}


// 
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 抛出中断异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

实验代码:

public class ReentrantLockTest {

    static ReentrantLock lock = new ReentrantLock(true);

    static class ClientThread implements Runnable {

        @SneakyThrows
        @Override
        public void run() {
            System.out.println(Thread.currentThread() + "开始尝试获取锁");
            lock.lockInterruptibly();
            try {
                System.out.println(Thread.currentThread() + "成功获取锁");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread() + "完成释放锁");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new ClientThread(), "t1");
        Thread t2 = new Thread(new ClientThread(), "t2");
        Thread t3 = new Thread(new ClientThread(), "t3");
        t1.start();
        t2.start();
        // 锁中断
        //lock.lockInterruptibly();
        TimeUnit.SECONDS.sleep(1);
        t3.start();
        TimeUnit.SECONDS.sleep(1);
        t3.interrupt();



        TimeUnit.SECONDS.sleep(10);
    }
}

输出结果:

Thread[t1,5,main]开始尝试获取锁
Thread[t2,5,main]开始尝试获取锁
Thread[t1,5,main]成功获取锁
Thread[t3,5,main]开始尝试获取锁
Exception in thread "t3" java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at io.zhengsh.juc._1lock.reentrantlock.ReentrantLockTest$ClientThread.run(ReentrantLockTest.java:18)
    at java.lang.Thread.run(Thread.java:748)
Thread[t1,5,main]完成释放锁
Thread[t2,5,main]成功获取锁
Thread[t2,5,main]完成释放锁

获取锁设置超时

lock.tryLock(2, TimeUnit.SECONDS)可以支持设置获取锁的超时时间,可以有效的避免线程饥饿问题

测试代码:

public class ReentrantLockTryTest {
    static ReentrantLock lock = new ReentrantLock(true);

    static class ClientThread implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t开始尝试获取锁");
            try {
                if (lock.tryLock(2, TimeUnit.SECONDS)) {
                    System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁成功");
                    TimeUnit.SECONDS.sleep(5);
                } else {
                    System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁失败");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (lock.isHeldByCurrentThread() && lock.isLocked()) {
                    lock.unlock();
                    System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t完成释放锁");
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new ClientThread(), "t1");
        Thread t2 = new Thread(new ClientThread(), "t2");
        Thread t3 = new Thread(new ClientThread(), "t3");
        t1.start();
        t2.start();
        t3.start();
        //t1.interrupt();

        TimeUnit.SECONDS.sleep(20);
    }
}

输出结果:

Thread[t1,5,main]   1653540581  开始尝试获取锁
Thread[t3,5,main]   1653540581  开始尝试获取锁
Thread[t2,5,main]   1653540581  开始尝试获取锁
Thread[t1,5,main]   1653540581  获取锁成功
Thread[t3,5,main]   1653540583  获取锁失败
Thread[t2,5,main]   1653540583  获取锁失败
Thread[t1,5,main]   1653540586  完成释放锁

条件等待队列使用

Condition 是在 java 1.5 中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。
Condition 是个接口,基本的方法就是 await() 和 signal() 方法;
Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition()
调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在 lock.lock() 和 lock.unlock 之间才可以使用:

测试场景:下面一个场景,需要ABC3个线程,A线程打印1次,然后是B线程打印2次,再是C线程打印3次,线程交替打印。
ABC线程需要交替执行,我们需要控制,线程的执行先后顺序
我们可以使用多条件Condition来控制,每一个线程拥有一个condition对象,调用各种的await方法,可以使线程等待,然后让别的线程调用这个condition对象的signal方法,唤醒线程。
代码如下:

public class ReentrantLockConditionTest {

    private int data = 1;
    private Lock lock = new ReentrantLock();
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    Condition condition3 = lock.newCondition();


    public void printA() {
        lock.lock();
        try {
            while (data != 1) {
                condition1.await();
            }
            // 打印5次
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + " ->" + data);
            }
            data = 2;
            // 通知B线程
            condition2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printB() {
        lock.lock();
        try {
            while (data != 2) {
                condition2.await();
            }
            // 打印10次
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + " ->" + data);
            }
            data = 3;
            // 通知C
            condition3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printC() {
        lock.lock();
        try {
            while (data != 3) {
                condition3.await();
            }
            // 打印15次
            for (int i = 0; i < 15; i++) {
                System.out.println(Thread.currentThread().getName() + " ->" + data);
            }
            data = 1;
            // 通知A
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockConditionTest conditionTest = new ReentrantLockConditionTest();
        // A,B,C 交替执行
        new Thread(conditionTest::printA, "A").start();
        new Thread(conditionTest::printB, "B").start();
        new Thread(conditionTest::printC, "C").start();
    }

}

输出结果如下:

A ->1
B ->2
B ->2
C ->3
C ->3
C ->3

作者:心城以北
链接:https://juejin.cn/post/7101956094137729031
来源:稀土掘金

上一篇下一篇

猜你喜欢

热点阅读