一起读源码

Java中的“锁”事之ReentrantLock

2020-08-18  本文已影响0人  MR丿VINCENT
keyboard

谈谈“锁”

说起Java的锁,脑袋里第一反应就是关键字synchronized.这是Java提供的基于语言级别的锁,底层是通过cup指令来实现的。对于使用者来说非常简单,容易上手。然而也有一些小缺陷。在早期的jvm中synchronized性能不是太好,而且加锁和释放锁不是很灵活,比如只能在程序正常执行完成和抛出异常时释放锁,对锁的持有很“执着”,获取锁的时候没法设置超时时间等。

除了jvm层面实现的锁之外,JDK中也提供了另外的锁实现。下面从一个例子说起。

ReentrantLock

public void test0() throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            boolean b = lock.hasQueuedThreads();
            System.out.println("t1"+ b);
            lock.lock();
            System.out.println("t1 start working...");
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.println("t1 do working...");
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        Thread t2 = new Thread(() -> {
            boolean b = lock.hasQueuedThreads();
            System.out.println("t2"+ b);
            lock.lock();
            System.out.println("t2 start working...");
            try {
                for (int i = 0; i < 10; i++) {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("t2 do working... ");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });
        

        Thread t3 = new Thread(() -> {
            boolean b = lock.hasQueuedThreads();
            System.out.println("t3"+ b);
            lock.lock();
            System.out.println("t3 start working...");
            try {
                for (int i = 0; i < 10; i++) {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("t3 do working... ");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        t1.start();
        t2.start();
        t3.start();
        t1.join();
        t2.join();
        t3.join();
        System.out.println("++++finished++++");
    }

很容易看出,demo中使用了ReentrantLock来作为锁来对三个线程进行协调,确保三个线程顺序执行。使用方式也很简单:在需要保护的代码前后使用lockunlock即可。

既然ReentrantLock能提供和synchronized一样的锁机制,那必须得看看到底这个“锁”有什么黑魔法。

ReentrantLock和AbstractQueuedSynchronizer之加锁

加锁其实是一个很容易理解的过程,其中我认为有点绕的是node结点之间链的摘除和建立,毕竟数据结构的基础还是比较弱,稍微多绕几圈就被整蒙圈了。
在研究AQS锁实现之前得聊一下什么是“公平”和“非公平”锁。所谓公平锁遵循先来的先获得锁,翻译成白话就是大家都是在排队的;而非公平锁则反之,只要有获取锁的机会,那就不顾一切去抢,不排队。

ReentrantLock默认的实现为非公平锁。理论上来说非公平锁比公平锁效率更高。当然也可以通过指定参数来区分是否使用公平锁。

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
// ReentrantLock中的lock方法其实
public void lock() {
    sync.lock();
}

static final class NonfairSync extends Sync {
    //...
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
    abstract void lock();
    //...
}

NonfairSync作为内部类继承自Sync,而Sync继承自AbstractQueuedSynchronizer
说白了其实就是个模版方法,AQS提供基础实现,子类根据自己需要去自定义不同的逻辑。

接下来根据demo中的几个关于锁的基本操作(lock)来看看其实现细节。

首先lock方法中的compareAndSetState(0, 1)语义是如果当前的值为0,那就更新为1.这是一个基于cpu指令的原子操作。

    /**
     * The synchronization state.
     */
    private volatile int state;
    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;
    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

如果更新成功,那就返回true。而这个原子更新的字段为AQS的state。这个字段简单理解为获取锁的标志,整个锁的核心都是围绕着这个字段来完成的。
如果更新成功,那么将当前线程置为exclusiveOwnerThread。这个变量表示当前持有锁的线程。
完整的语义即:当某个线程中的逻辑调用lock方法后,lock对象中的state字段由0更新为1,当前线程持有锁。
那这个线程没执行完操作,还没释放掉锁,后续的线程怎么办?

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// AQS中的实现 必须得由子类重写
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// NonfairSync中的重写
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

如果在某个线程获取到了锁之后还没释放,其他线程也执行到lock方法,这时候由于lock对象中state为1,因此没办法更新,所以执行acquire逻辑。而acquire调用nonfairTryAcquire方法。
首先获取state的值,在我们的demo中由于之前的线程没有释放掉锁,这里的c的值为1,而当前线程和lock对象中持有的线程不一样(getExclusiveOwnerThread返回之前持有锁的线程对象)因此这里直接返回false。
当线程中执行的任务很短的时候,短到几纳秒,获取到锁的线程马上释放掉了。这个state值从1变成了0,这里其他线程就有机会再次去“争夺”一次锁,同样使用cas操作将state值从0到1,同时将当前线程置为lock对象的exclusiveOwnerThread字段。最后返回true,表示获取到了锁。
还有一种情形,一个线程多次去lock,这里lock对象中持有的线程锁同一个线程,因此进入到current==getExclusiveOwnerThread()逻辑。做法也很简单,将state再加1即可,这个线程依旧能获取到锁。这就是所谓的可重入(Reentrant),即可以多次获取一个锁。

tryAcquire方法返回为真时,表示当前线程成功获取到了锁,整个lock逻辑已经完成,后面的acquireQueued方法就直接忽略掉。
这里小结一下:

接下来看看acquireQueued方法的实现。上面说道,当尝试获取锁成功的时候,lock方法就结束了,如果尝试获取锁失败呢?如果失败就进入到acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

其实这里是两个方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),先调用addWaiter


private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

看到这里,出现了新的数据结构Node,为了更加方便理解,现在不得不对这个数据结构进行说明。

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        volatile int waitStatus;
        
        volatile Node prev;

        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

首先是俩静态变量,这个变量仅仅是一个标记,并没有实际用途:

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

因为AQS有两种模式:独占和共享。独占模式例如demo中的ReentrantLock,而共享模式如并发工具包中的CountDownLatch。接着就是waitStatus变量,同时指定了几个枚举。然后就是thread当前线程,以及前驱后继结点。不难看出这是一个双端链表结构。nextWaiter字段暂时按下不表。

继续看addWaiter方法,由于传入的mode为Node.EXCLUSIVE,因此这里创建的node的nextWaiter字段的值为null,将当前要获取锁的线程也放进node里,然后尝试去“操作”这个node。实际上就是看这个AQS中node队列除了当前创建的还有没有别的:

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

这里的tailhead都是AQS中的变量,用于操控node链表。他们的更新都是使用cas实现的,保证原子性。如果AQS中没有node链表(没有形成),head和tail都是null,直接走enq逻辑,然后将新创建的这个node返回,如果AQS中有结点存在呢,那就直接将创建的node变成tail。compareAndSetTail(pred, node)的语义为,如果当前tail的值为pred,那么将其更新为node。然后修改后继指针,返回node结点。

再看看AQS中的结点为空的时候:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

逻辑很清晰,这里进一步判断了一下tail是否为空,如果是真为空,那就新建一个node结点作为头结点,同时将tail指向头结点。这时候头结点就是尾结点,且结点内没有数据,只是作为一个标志而已。然而并没有返回,因为是个死循环,头尾结点初始化成功之后,继续走else逻辑,同理将新创建的结点的前驱指向刚才新建的空结点,然后把tail指向自己(Node node = new Node(Thread.currentThread(), mode);)的结点。最后修改后继指针并返回。这个逻辑看起来比较绕,尤其是指针的操作让人眼花缭乱,通过画图会更容易理解。总结一句话就是:创建node链表,初始化tail和head指针,且head指针指向的是一个空node(仅仅有意义的事waitStatus=0,因为没给值,默认就是0)。而返回的新创建的node作为acquireQueued参数:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

核心逻辑又是一个死循环,首先获取刚才创建的node结点的前驱结点,如果前驱结点为head结点(空的结点),可以再给这个线程一次机会,尝试获取锁。tryAcquire之前说过,这里不赘述。如果运气好,获取到了(state从0到1)返回true,将当前结点设置为head,同时摘除链表关系,也就是那个空结点被释放了,这个时候head结点可不是空结点了,而是Node node = new Node(Thread.currentThread(), mode);创建出来的。最后返回false,获取锁成功。为什么要来这一出呢?因为如果之前获取锁的线程执行任务的时候,其他线程在尝试着排队的时候还是有机会去抢一下的,说不定哪一瞬间任务结束释放了锁其他线程刚好抢到了呢?当然这也是有前提的,当线程决定去排队,且是排第一个的时候才能有多一次机会去抢锁。这里有疑问了,这个不是非公平的么?为啥还得排第一个才能抢?其实并不矛盾,因为每个线程都至少有一次机会去抢锁,通过tryAcquire。只有没抢到的,打算排队的,排到第一个的线程有第二次机会。当然,就算某个线程排第一,多一次抢锁机会,也不一定必然抢到呀,因为别的线程依旧和这个线程一样,同样是通过tryAcquire来抢的,因此是公平的,严格来说不公平,因为排第一的线程多了一次机会。

如果这个排第一的倒霉鬼还是没获取到锁,那就很难受了。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这里我们先考虑前驱结点为空结点的情况,之前提到,空结点的waitStatus没有赋值,默认为0,因此这里直接走compareAndSetWaitStatus(pred, ws, Node.SIGNAL);逻辑,将其置为-1;最后返回false,然而for死循环的逻辑还没结束,还会继续尝试获取一下锁,如果还是没获取到,那就再次进入到shouldParkAfterFailedAcquire中,因为第一次循环中将其waitStatus从0设置为了-1,因此这里直接返回true,所以,当在ASQ内部中“排队”的线程数第一个,是有两次次额外的获取锁的机会的。
接着就是parkAndCheckInterrupt逻辑了:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

park为停车的意思,这里理解为挂起也没太大毛病。说白了就等着呗,等到什么时候为止呢?那就得从unlock说起了。

ReentrantLock和AbstractQueuedSynchronizer之释放锁

demo中的解锁方法unlock对应的实现逻辑为release

/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

类似地,先尝试去释放一下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

先把state值减回去,判断一下当前的线程是不是AQS中锁持有的线程,如果不是那就说明有问题。如果当state还原为0了说明锁被释放掉了,同时将当前AQS持有的线程置为空。最后将当前state值更新(更新为减1后的,这里并不一定是0)。正如上文提到过,如果加多次锁,那么也得释放多次。如果没获释放掉,那就说明当前锁依旧被持有。

如果更新state成功,那么还需要做的一件事就是处理node结点。如果AQS中的头结点不为空,且状态不是默认的初始化的0,那么就去唤醒后继结点:

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

这里逻辑也十分清晰,先更新结点的waitStatus,将其置为0.然后找到后面的结点,如果不是空,那就将其唤醒,和之前是park一一对应。这里还多了一段判断锁被取消的情况,注释中也写得很清晰,意思就是从node链表的尾部开始找,一直找到符合要求的结点将其唤醒。

唤醒了还没完,因为等待锁的线程被park了后还得继续执行后续的逻辑。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 锁释放之后 这里会继续执行
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

parkAndCheckInterrupt返回的值为当前线程的中断状态,如果当前(获取锁的)线程被设置了中断标记,那么这个方法就直接返回true。即interrupted=true
由于是死循环,同样的当前线程(被唤醒的)获取一下锁,因为AQS中的state已经还原了,所以这里能拿到,将当前结点设置为头结点,获取锁完成。由于可能存在当前获取锁的线程由于某种情况被设置了中断标记,那么就将其中断(也只是设置中断标记)。

相比获取锁的操作,释放锁容易很多。

小结

本文基于ReentrantLock锁的基础实现,对AQS的大致原理进行了比较粗略的分析。如AQS的底层结构,核心的API等。通过锁的基础操作,如加锁和释放锁背后的逻辑进行了详细解读。当然还有很多没有涉及到的地方,如条件队列,共享模式的实现,公平和非公平的体现等。当知道了AQS的原理之后,去理解这些主题也是非常轻松的。总的来说,AQS的代码量不算太多,读起来不是很吃力。

PS:在云笔记中发现18年的时候也写过一篇AQS的文章,现在居然一点印象都没有了,时间啊,是真残酷。2年前的笔记

参考资料

Java并发锁框架AQS(AbstractQueuedSynchronizer)原理从理论到源码透彻解析

Java里一个线程调用了Thread.interrupt()到底意味着什么?

上一篇下一篇

猜你喜欢

热点阅读