我爱编程程序员

[Java源码][并发J.U.C]---用代码一步步实现AQS(

2018-08-10  本文已影响3人  nicktming

前言

此文接着上文 [Java源码][并发J.U.C]---用代码一步步实现AQS(1)---独占锁的获取和释放[Java源码][并发J.U.C]---用代码一步步实现AQS(2)---独占锁中断式获取, 本文将针对共享锁的获取和释放进行分析, 先会以一个小例子看看共享锁的简单使用,再以一个流程图来整体了解一下共享锁是如何获得和释放的, 最后从代码角度分析每个方法的作用.

本文源代码: 源代码下载

例子1: 共享锁的获取和释放

与前面两篇文章不一样的是本文将以一个例子开头先看看共享锁是怎么样一回事.
如下所示, TwinsLock 是一个共享锁, 可以最多允许两个线程获得锁,

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

public class TwinsLock implements Lock {
    private final Sync sync = new Sync(2);
    private static final class Sync extends AbstractQueuedSynchronizer {
        public Sync (int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must larger than zero.");
            }
            super.setState(count);
        }
        public int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = super.getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || super.compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }
        public boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = super.getState();
                int newCount = current + returnCount;
                if (super.compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }
    public void printWaitingNode() {
        sync.printQueue();
    }
    @Override
    public void lock() {
        sync.acquireShared(1);
    }
    @Override
    public void unlock() {
        sync.releaseShared(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub
    }
    @Override
    public boolean tryLock() {
        // TODO Auto-generated method stub
        return false;
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }
    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }
}

接下来看一下测试类TestTwinsLock: 启动5个线程去获取锁,线程获取锁后sleep6秒钟, 并且主线程会打印出锁的等待队列的情况.

import java.util.concurrent.TimeUnit;

public class TestTwinsLock {

    public static void main(String[] args) {
        TwinsLock m = new TwinsLock();
        for (int i = 0; i < 5; i++) {
            new Thread(new Runner(m), "thread-" + i).start();;
        }
        for (int i = 0; i < 5; i++) {
            m.printWaitingNode();
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    static class Runner implements Runnable {
        TwinsLock m;
        public Runner(TwinsLock m) {
            this.m = m;
        }
        @Override
        public void run() {
            m.lock();
            System.out.println(Thread.currentThread().getName() + " runs");
            try {
                TimeUnit.SECONDS.sleep(6);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            m.unlock();
        }
    }
}

输出结果: thread-0thread-1先获得锁, 然后thread-2thread-3再获得锁,最后thread-4获得锁.

thread-0 runs
thread-1 runs
[NULL,-1]->[thread-2,-1]->[thread-3,-1]->
[NULL,-1]->[thread-2,-1]->[thread-3,-1]->[thread-4,0]->
thread-3 runs
thread-2 runs
[NULL,-1]->[thread-4,0]->
thread-4 runs
[NULL,0]->
[NULL,0]->

这个例子可以很简单地了解一下共享锁的简单使用. 接下来再以一个流程图来具体看看是怎么样的一个流程.

流程图

共享锁的获取

shared lock(1).png

共享锁的释放

shared lock release(2).png

源码分析

接着上面提及的两篇文章继续在AbstractQueuedSynchronizer.java文件中添加源代码.

共享锁的获取

首先添加acquireShared(int arg)方法

acquireShared(int arg) 方法

    protected int tryAcquireShared(int arg) {  // 留作子类实现
        throw new UnsupportedOperationException();
    }
    
    public final void acquireShared(int arg) {
        /**
         * 如果没有获得锁,则放入到等待队列中
         */
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

作用 : 获取锁. 如果获取锁成功, 直接返回继续执行该线程后续的操作, 如果获取锁失败, 则加入到等待队列中并且阻塞该线程(也就是doAcquireShared(arg)).

doAcquireShared(arg) 方法

此方法可以与获得独占锁中的acquireQueued(final Node node, int arg)方法对照着看 [Java源码][并发J.U.C]---用代码一步步实现AQS(1)---独占锁的获取和释放.

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 区别点 1
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

作用 :

1. 将该线程包装成共享型节点并且加入到等待队列中, 并且在该线程阻塞前会保证有前面的节点会在必要的时刻(其实就是前面有节点取消或者释放)会去唤醒它.这个操作是通过shouldParkAfterFailedAcquire(p, node)完成, 会把前驱节点的状态值设置为-1. 阻塞该线程的方法是由parkAndCheckInterrupt()完成. 这两个方法在[Java源码][并发J.U.C]---用代码一步步实现AQS(1)---独占锁的获取和释放 有详细分析.

2. 该线程在阻塞的过程中有两种情况会从parkAndCheckInterrupt()方法中返回, 被前驱节点唤醒或者被中断唤醒, 被中断唤醒的时候在成功获得锁的时候需要重新通过selfInterrupt()方法设置一下中断状态. 如果有不明白的可以看我的另外一篇博客 [并发J.U.C] 用例子理解线程中断.
这里有个疑问: 为什么在获得独占锁的时候是把中断操作放到acquire(int arg)中设置中断状态,而在这里只有在获得了锁之后才会设置中断状态?(待解决)

3. 在获取锁失败的情况下会去等待队列中取消该线程所对应的节点.cancelAcquire(node), 该方法在[Java源码][并发J.U.C]---用代码一步步实现AQS(1)---独占锁的获取和释放 也有详细分析.

4. 获取共享锁与获取独占锁在代码中的区别具体体现在代码中的setHeadAndPropagate(node, r) 在独占锁中是setHead(Node node).那接下来主要看看setHeadAndPropagate(node, r)具体做了什么事情.

setHeadAndPropagate(node, r) 方法

参数node是当前线程对应的节点对象, r表明还剩下几个许可, 也可以理解锁还可以被r个线程拿到.

在看这个方法之前得想清楚一件事情, 该线程进入到该方法时,说明了一个情况起码是这样的, 该线程在第一次获取锁的时候(在acquireShared(int arg)方法中的tryAcquireShared(arg)返回的值是小于0的,才会进入到doAcquireShared(arg)方法中)没有获取成功, 表明那个时候共享的锁被全部获取完了, 所以如果进入到了setHeadAndPropagate(node,r)表明有线程已经释放了锁, 这个时候才有机会来获取锁.

所以在此之前先看一下锁的释放后回头再看该方法.

共享锁的释放

releaseShared(int arg) 释放

    protected boolean tryReleaseShared(int arg) {  //留作子类实现
        throw new UnsupportedOperationException();
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {  // 如果设置状态成功
            doReleaseShared();
            return true;
        }
        return false;
    }

作用 : 如果设置AQS的状态值成功,则去唤醒等待队列中的线程回来竞争锁

请注意一点: 是先设置好状态值之后才去唤醒等待队列中的线程, 如果此时刚刚好有一个刚刚运行的线程来请求获取锁, 那会看被唤醒的线程和这个刚刚运行的线程来竞争锁

接着看核心方法doReleaseShared()

doReleaseShared() 方法

其实该方法的流程图就是上面流程图部分的第二张图.这里再次引入一下:

shared lock release(2).png
  private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                    /**
                     * 进入到这里表明队列不为空并且有后继节点
                     */
                int ws = h.waitStatus;
                // 只有当节点状态是SIGNAL的时候才会去唤醒后继节点 
                // 并且把节点状态改为0
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // 进入for循环重试
                    unparkSuccessor(h);
                }
                // 如果状态是0 则更新为PROPAGATE状态
                // 因为只有状态是-1的时候才要去唤醒
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // 进入for循环重试
            }
            /**
             *  为什么还要判断 h == head 呢?
             *  就说明在执行该方法的时候, head有可能会发生改变
             *  这是因为在执行上面的unparkSuccessor(h)的时候会去唤醒后驱节点
             *  现在设置后驱节点对应的线程为thread-B
             *  此方法所在的线程是thread-A
             *  如果thread-A在执行完unparkSuccessor(h)失去控制权,这个时候thread-B
             *  刚刚好从parkAndCheckInterrupt()方法的阻塞状态中返回(因为被唤醒了)并且
             *  获得了锁,此时thread-B便会执行setHeadAndPropagate方法,head就会发生改变
             * 
             */
            if (h == head)    
                break;
        }
    }

作用:

1. 当头节点的状态是-1的时候,改变自身状态后需要去唤醒后驱线程, 如果状态是0,则改为PROPAGATE状态(不清楚为什么).
2. 在设置的过程中head确实有可能会发生改变, 改变的情况在文中已经说明, 但是为什么需要设置head == h 才可以退出循环?我个人理解是直到没有线程可以获得共享锁为止, 因为head发生改变的时候其实是表示有后驱的节点拿到锁了.

关于unparkSuccessor(h)的作用就是去唤醒h的后驱节点所对应的线程, 唤醒后的线程会在parkAndCheckInterrupt()方法中返回, 具体的细节在 [Java源码][并发J.U.C]---用代码一步步实现AQS(1)---独占锁的获取和释放 有详细分析.

setHeadAndPropagate(node, r) 方法

在明白了锁的释放后, 回头再来看看该方法, 如果进入该setHeadAndPropagate(node, r)方法, 表明被唤醒的线程获得了锁.

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;  // 记录一下旧的头节点
        setHead(node);  // 将当前节点设置为头节点
        /** 
         * 如果propagate > 0 说明锁还没有被别的线程拿到
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

在设置了头节点后, 如果propagate大于0的时候表明还可以有线程有机会获得锁, 如果后面一个节点是null或者是共享状态的时候去唤醒后面的节点所对应的线程.

疑问: 对于那些判断条件目前还没有弄清楚

响应中断的获取和超时获取

在理解了共享锁的获取和释放(1) 和 [Java源码][并发J.U.C]---用代码一步步实现AQS(2)---独占锁中断式获取 (2), 共享锁响应中断和超时获取的逻辑与就是1和 2 的整合, 所以这里就不多做分析了.

参考

1. Java并发编程的艺术
2. Java1.8 java.util.concurrent.locks包的源代码

上一篇下一篇

猜你喜欢

热点阅读