程序员Java 并发

【Java 并发笔记】AbstractQueuedSynchro

2018-12-13  本文已影响0人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 框架整理

AQS 框架结构

表 1

状态值 状态 说明
1 CANCELLED 取消状态
-1 SIGNAL 等待触发状态
-2 CONDITION 等待条件状态
-3 PROPAGATE 状态需要向后传播
private static final Unsafe unsafe = Unsafe.getUnsafe();

protected final int getState() {
     return this.state;
}
protected final void setState(int newState) {
     this.state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
方法名称 说明
isHeldExclusively() 该线程是否正在独占资源。只有用到 condition 才需要去实现它。
tryAcquire(int) 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
tryRelease(int) 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
tryAcquireShared(int) 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int) 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。

ReentrantLock

CountDownLatch

1.1 Node 结构

结点结构
Node {
    int waitStatus;
    Node prev;
    Node next;
    Node nextWaiter;
    Thread thread;
}
属性名称 描述
int waitStatus 表示结点的状态。其中包含的状态见表 1。
Node prev 前驱结点,比如当前节点被取消,那就需要前驱结点和后继结点来完成连接。
Node next 后继结点。
Node nextWaiter 存储 condition 队列中的后继结点。
Thread thread 入队列时的当前线程。

2. 实现分析整理

2.1 独占模式

2.1.1 acquire(int)(线程获取锁的过程)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
acquire 流程

tryAcquire

protected boolean tryAcquire(int arg) {
     throw new UnsupportedOperationException();
}

addWaiter(Node)

private Node addWaiter(Node mode) {
        //以给定模式构造结点。mode 有两种:EXCLUSIVE(独占)和 SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        //尝试快速方式直接放到队尾。
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //上一步失败则通过 enq 入队。
        enq(node);
        return node;
}
......
private Node enq(final Node node) {
        //CAS " 自旋 ",直到成功加入队尾。
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize //队列为空,创建一个空的标志结点作为 head 结点,并将 tail 也指向它。
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { //正常流程,放入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

acquireQueued(Node, int)

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; //标记是否成功拿到资源
        try {
            boolean interrupted = false; //标记等待过程中是否被中断过
            //自旋
            for (;;) {
                final Node p = node.predecessor(); //获得前驱结点
                //如果前驱结点 p 是 head,即该结点 node 为第二结点,那么便有资格去尝试获取资源(可能是 p 释放完资源后唤醒,也可能被 interrupt)。
                if (p == head && tryAcquire(arg)) {
                    setHead(node); //获取资源后,将 head 指向 node。
                    p.next = null; // setHead 中 node.prev 已置为 null,此处再将 p.next 置为 null,就是为了方便 GC 回收以前的 head 结点 p,也就意味着之前拿完资源的结点 p 出队。
                    failed = false;
                    return interrupted; //返回等待过程中是否被中断过
                }
                //如果自己可以休息了,就进入 waiting 状态,直到被 unpark()。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //如果等待过程中被中断过,哪怕只有那么一次,就将 interrupted 标记为 true。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

shouldParkAfterFailedAcquire(Node, Node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;  //获得前驱结点状态。
        if (ws == Node.SIGNAL)
            //如果前驱结点状态为等待触发,则进入安全休息点。
            return true;
        if (ws > 0) {
            //如果前驱为取消状态,就一直往前找,直到找到最近一个正常等待的状态,并排在它的后面。
            //那些取消状态的结点,由于被当前结点 " 加塞 " 到它们前边,它们相当于形成一个无引用链,稍后会被 GC 回收。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果前驱结点正常,将前驱状态设置成 SIGNAL 等待触发
            //下一次循环进入 shouldParkAfterFailedAcquire 因为前驱状态已经设置为 SIGNAL,因此直接返回 true,执行 parkAndCheckInterrupt,对当前线程 park。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}

parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); //调用 park() 使线程进入 waiting 状态。
        return Thread.interrupted(); //如果被唤醒,查看自己是不是被中断的。
}
线程获取锁流程

selfInterrupt()

static void selfInterrupt() {
     Thread.currentThread().interrupt();
}

cancelAcquire(Node)

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        //设置该结点不再关联任何线程。
        node.thread = null;

        // 通过前继结点跳过取消状态的 node。
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        
        // 获取过滤后的前继结点的后继结点。
        Node predNext = pred.next;
        // 设置状态为取消状态。
        node.waitStatus = Node.CANCELLED;

        // 1.如果当前结点是 tail,尝试更新 tail 结点,设置 tail 为 pred。更新失败则返回,成功则设置 tail 的后继结点为 null。
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 2.如果当前结点不是 head 的后继结点,判断当前结点的前继结点的状态是否为 SIGNAL,如果不是则尝试设置前继结点的状态为 SIGNAL。
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 3.如果是 head 的后继结点或者状态判断设置失败,则唤醒当前结点的后继结点。
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
}
当前结点是 tail 当前结点不是 head 的后继结点,也不是 tail 当前节点是 head 的后继结点
do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;

2.1.2 release(int)(线程释放锁的过程)

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head; //获得头结点。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //唤醒等待队列里的下一个线程。
            return true;
        }
        return false;
}

tryRelease(int)

protected boolean tryRelease(int arg) {
     throw new UnsupportedOperationException();
}

unparkSuccessor(Node)

private void unparkSuccessor(Node node) {
        //node 为当前线程所在结点。
        int ws = node.waitStatus;
        if (ws < 0) //置零当前线程所在的结点状态,允许失败。
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next; //找到下一个需要唤醒的结点 s。
        if (s == null || s.waitStatus > 0) { //如果为空或取消状态
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) // <=0 的结点,都是有效结点。
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //唤醒
}

2.2 共享模式

2.2.1 acquireShared(int)(线程获取锁的过程)

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

tryAcquireShared(int)

protected int tryAcquireShared(int arg) {
     throw new UnsupportedOperationException();
}

doAcquireShared(int)

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); //加入队列尾部,SHARED 模式。
        boolean failed = true; //是否成功标志
        try {
            boolean interrupted = false; //等待过程中是否被中断过的标志
            for (;;) {
                final Node p = node.predecessor(); //获得前驱结点
                if (p == head) { //如果结点为 head 结点的下一个,因为 head 是拿到资源的线程,此时 node 被唤醒,很可能是 head 用完资源来唤醒自己的。
                    int r = tryAcquireShared(arg); //尝试获取资源
                    if (r >= 0) { //获取资源成功
                        setHeadAndPropagate(node, r); //将 head 指向自己,如果还有剩余资源可以再唤醒之后的线程。
                        p.next = null; // help GC
                        if (interrupted) //如果等待过程中被打断过,此时将中断补上。
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //判断状态,寻找安全点,进入 waiting 状态,等着被 unpark() 或 interrupt()。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

setHeadAndPropagate(Node, int)

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; //获得 head 结点
        setHead(node); //head 指向当前结点
        // 如果还有剩余量,继续唤醒下一个结点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
}

2.2.2 releaseShared()(线程释放锁的过程)

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
}

tryReleaseShared(int)

protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
}

doReleaseShared()

private void doReleaseShared() {
       
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            
                    unparkSuccessor(h); //唤醒后继
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; 
            }
            if (h == head)  // head 发生变化
                break;
        }
}

acquireInterruptibly(int)

public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
}

acquireSharedInterruptibly(int)

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

tryAcquireNanos(int, long)

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
}

doAcquireNanos(int, long)

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                //计算时间,最后期限减去当前时间,得到了还应该睡眠的时间。
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

2.3 等待队列

同步队列 等待队列

2.3.1 Condition 接口

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}
方法 说明
await() 调用此方法后,会使当前线程在接收到唤醒信号(signal)之前或被中断之前一直处于等待休眠状态。调用此方法后,当前线程会释放持有的锁。如果当前等待线程从该方法返回(被唤醒),那么在返回之前会重新获取锁(获取到锁才能继续执行)。
await(long time,TimeUnit unit) 调用此方法后,会使当前线程在接收到唤醒信号之前、被中断之前或到达指定等待时间之前一直处于等待状态。如果在从此方法返回前检测到等待时间超时,则返回 false,否则返回 true。
awaitNanos(long nanosTimeout) 该方法等效于 await(long time,TimeUnit unit) 方法,只是等待的时间是 nanosTimeout 指定的以毫微秒数为单位的等待时间。该方法返回值是所剩毫微秒数的一个估计值,如果超时,则返回一个小于等于 0 的值。可以根据该返回值来确定是否要再次等待,以及再次等待的时间。
awaitUninterruptibly() 当前线程进入等待状态直到被通知,该方法对中断忽略。
awaitUntil(Date deadline) 当前线程进入等待状态直到被通知,中断或者到某个时间,如果没有到指定时间就被通知,返回 true,否则表示到了指定时间,返回 false。
signal() 唤醒一个等待线程,如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
signalAll() 唤醒所有等待线程,如果所有的线程都在等待此条件,则唤醒所有线程。 在从 await 返回之前,每个线程必须重新获取锁。

2.3.2 Condition 的实现

public class TestQueue<T> {

    //队列大小
    private int size;
    //list 充当队列
    private List<T> queue;
    //锁
    private Lock lock = new ReentrantLock();
    //保证队列大小不 <0 的 condition
    private Condition notEmpty = lock.newCondition();
    //保证队列大小不 >size 的 condition
    private Condition notFull = lock.newCondition();

    public TestQueue(int size) {
        this.size = size;
        queue = new ArrayList<T>();
    }

    public void product(T t) throws Exception {
        lock.lock();
        try {
            //如果队列满,则不能生产,等待消费者消费数据。
            while (size == queue.size()) {
                notFull.await();
            }
            //队列已经有空位置,放入一个数据。
            queue.add(t);
            //通知消费者可以继续消费。
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T consume() throws Exception {
        lock.lock();
        try {
            //队列为空,则不能消费,等待生产者生产数据。
            while (queue.size() == 0) {
                notEmpty.await();
            }
            //队列已经有数据,拿掉一个数据
            T t = queue.remove(0);
            //通知生产者可以继续生产。
            notFull.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
}
  1. 假设存在线程 a 和线程 b。
    • 线程 a 调用 product() 方法,执行了 lock.lock(),线程 a 加入到 AQS 同步队列中,构建结点 A。
    • 线程 b 调用 consume() 方法,执行了 lock.lock(),线程 b 加入到 AQS 同步队列中,构建结点 B。
    • 结点 B 是结点 A 的后继结点,结点 A 是结点 B 的前驱结点。
    • 同步队列初始状态为下图。
同步队列的初始状态
  1. 假设自定义队列 queue 已满,线程 a(结点 A)调用 notFull.await() 方法。
    • 线程 a(结点 A)从 AQS 同步队列中被移除,对应操作是锁的释放。
    • 线程 a(结点 A)被加入到 Condition 等待队列,线程 a 需要等待 singal 信号。
  2. 线程 b(结点 B)由于线程 a(结点 A)释放锁被唤醒,成为同步队列的头结点且同步状态为 0 可以获取锁。
    • 线程 b(结点 B)获取锁。
结点 A 进入等待队列
  1. 假设线程 b(结点 B)调用 notFull.singal() 方法,Condition 等待队列中只有结点 A,把它取出来加入到 AQS 同步队列中。
    • 这时候线程 a(结点 A)并没有被唤醒
结点 A 重新进入 AQS 队列
  1. 线程 b(结点 B)notFull.signal() 方法执行完毕,调用 lock.unlock() 方法释放锁。线程 a(结点 A)成为 AQS 首结点并且同步状态可获取,线程 a(结点 A)被唤醒,继续执行。

  2. AQS 按从头到尾的顺序唤醒线程,直到等待队列中的线程被执行完毕结束。


await(等待)

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

signal(通知)

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
}
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

2.4 问题

2.4.1 插入节点的代码顺序

源码

Node pred = tail;
if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
     }
}

修改

Node pred = tail;
if (pred != null) {
      //node.prev = pred;  // del
      if (compareAndSetTail(pred, node)) {
            node.prev = pred; // add
            pred.next = node;
            return node;
     }
}

分析

2.4.2 唤醒节点从 tail 向前遍历

源码

Node s = node.next; //找到下一个需要唤醒的结点 s。
if (s == null || s.waitStatus > 0) { //如果为空或取消状态
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0) // <=0 的结点,都是有效结点。
                s = t;
}

分析

2.4.3 PROPAGATE 状态存在的意义

源码

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) { //Node.PROPAGATE 状态就是为了此处可以读取到 h.waitStatus < 0(PROPAGATE 值为 -3)。
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
}

修复前版本

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);
        if (propagate > 0 && node.waitStatus != 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                unparkSuccessor(node);
        }
}

分析

2.4.4 AQS 如何防止内存泄露

参考资料

http://www.cnblogs.com/waterystone/p/4920797.html
https://www.jianshu.com/p/d8eeb31bee5c
https://www.cnblogs.com/micrari/p/6937995.html
https://blog.csdn.net/u014634338/article/details/77428108
http://www.importnew.com/26300.html

上一篇下一篇

猜你喜欢

热点阅读