java语言之二- Concurrent Lock源码解读

2018-11-10  本文已影响0人  Wu杰语

Lock所处的位置

见上一节《java并发知识体系简要指南》知识体系图


image.png

在unsafe中提供了两个关键机制:

由于安全机制,unsafe不能被直接调用,但是可以被虚拟机代码调用,而LockSupport则封装了这两个机制,可以供程序员调用,而为了实现各种场景的ECF(异常控制流),Concurrence的作者 设计了AbstractQueueSychronizer这个中间类,继承这个中间类,就可以轻松实现各种场景的ECF语义。

而这个中间类的代码是设计巧妙的,比较难懂的,最好的方法是懂得原理的情况下,使用各种ECF调试。

理解AbstractQueueSychronizer的关键:

各种异常流

ReetrantLock

需要理解:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 添加节点在addwaiter
            selfInterrupt();
    }
acquireQueued的关键代码
for (;;) {    // 恢复后代码又在这里运行,这里是种递归的运行方式, node就是每一个新节点,这里有点玄妙, release后唤醒的必然是head的后继节点, 这时会在这里更新头节点,或者说进行节点 的删除操作,然后lock的代码退出,进入业务逻辑代码
                
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {   # 2. 有可能需要重新加锁
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())    # 1. park后代码该线程运行就在这里停止了
                    interrupted = true;
            }

如上,当线程调用lock的时候,该节点会被加入到queue中,如果lock已经被占用,则调用tryAcquire不成功,会调用parkAndCheckInterrupt,操作系统会将该线程挂起。

当前线程unlock的时候, 会首先将后继节点unpark

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);   // release操作的时候会释放后继节点,让后继节点运行起来
       return true;
   }
   return false;
}

后继节点unpark后,会从上述“# 1. park后代码该线程运行就在这里停止了”标记处恢复运行,跳出死循环,跳出Lock()函数,执行线程本身的逻辑代码

image.png

如上图,当有两个线程都同时调用lock()的时候,ReetrantLock的队列如上图,头结点的thread是null,这是已经在执行的当前线程。

如上代码解释,当前线程执行unlock后,会执行unparkSuccessor, 其逻辑是当前节点的waitstate == -1,就找到后继节点,如上后继节点中thread = thread1,就执行unpark(thread1),恢复thread1的运行,queue如下:


image.png

如此往复执行,直到Node3的节点执行完毕,此时队列中就只剩下头结点了。

ReadWriteReetrantLock

关键代码:

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); # 在这里加入到队列
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())   # 1. 执行了park
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    
    private void setHeadAndPropagate(Node node, long propagate) {
        Node h = head; // Record old head for check below
        setHead(node);   #删除头节点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

例如说一堆线程运行,加read锁或者加write锁,执行lock后,queue如下:


image.png

如上,thread为Exclusive表示是写锁,是独占的,thread是Shared表示是读锁,共享的。运行过程如下:

  1. 写锁运行时,后续的线程都被阻塞了。
  2. 当写锁完成,发现waitstate=-1,解锁后续线程,后续线程是shared的,它会一直解锁线程,直到所有连续的shared线程都解锁。
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

这段代码的关键是,Node2、Node3 unpark后执行跳出lock时会增加state计数,执行完成是执行unlock则会减少state计数,只有当state计数减少为0的时候,才会引发Node4的解锁

  1. Node4是独占锁,和1执行相同,如此往复。

semaphore

假定semaphore被初始化为2,有5个线程启动加semaphore


image.png

注意看这里的关键点state,state就是表示有几个可解锁的park。
关键代码

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

当state 不为0的时候,获取锁总是能成功,此时有2个线程获取锁成功,其它线程获取不到锁会加入到queue中。
正在运行某个线程运行完毕,则会将state加1,并且删除queue头结点,唤醒后续结点,如此往复,一直保持有2个线程可以运行
由于只有2个线程在运行,因此state最大只能还原为2.

Cond

Cond需要有一把锁,用于保护condition对wait queue转移到queue。


image.png

如图,当有两个线程调用了wait()后,均被阻塞,在wait queue中有两个结点。
当调用了signal后,讲产生结点转移



Node1从wait queue转移到queue中,当锁unlock后,会去掉头结点,并去头结点的waitstate=-1,将会唤醒后续结点node1.
如此往复,直至所有的节点执行完毕

CountDownLanch

这是数据运行最简单的一个队列了,假定初始化为3


image.png

如图,主线程调用await(),被park()加入到queue中。每次有线程调用countDown()时,会讲state减去1,并且检查state是否为0,如果为0,则会讲头结点删除,解锁后续结点,即将主线程解锁。

上一篇下一篇

猜你喜欢

热点阅读