同步器.三驾马车之CountDownLatch
前言
本来想自己写一篇CountDownLatch、CyclicBarrier和Semaphore。但有点懒,有点没有time,调整下原文的排版,方便阅读。
白话文语意:
- CountDownLatch:老大等待几个小弟干完后再自己动手
- CyclicBarrier:几个小弟集合完毕后再动手。
- Semaphore:最多几个小弟同时动手
开篇
CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行任务。
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
CountDownLatch的用法
- CountDownLatch典型用法1:某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为n new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- CountDownLatch典型用法2:实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计数器初始化为1,多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
总结
CountDownLatch的工作原理,总结起来就两点(基于AQS实现):
- 初始化锁状态的值为需要等待的线程数。
- 判断锁状态是否已经释放,如果锁未释放所有等待锁的线程就会进入等待的CLH队列。
- 如果锁状态已经释放,那么就会通过传播性唤醒所有的等待线程。
原文和源码
首先CountDownLatch的作用在这里就不多说了,就是一个闭锁,其实现基于AQS, 想要理解它的实现原理就请有耐心的看完
本篇博文,看了之后基本就掌握的七七八八了,下面我们就来分析吧
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
首先从构造函数出发 初始化状态变量 ,其中sync是一个AQS的子类,构造函数如下
Sync(int count) {
setState(count);
}
设置状态变量state,其中state是个volatile 用于保证可见性和不可指令重排
接着我们看await函数 ,
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
这里调用的是其内部类sync的函数,具体实现如下
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
首先判断是否被中断,中断就抛出异常,零与tryAcquireShared(arg)的返回值相比较,具体实现如下
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
首先明白state状态变量,state的值代表着待达到条件的线程数,比如初始化为五,表示待达到条件的线程数为5,每次调用countDown()函数都会减一,所以当没有达到条件也就是state不等于0将会返回-1,进入if语句 ,接着往下看 这方法看起来很长
但不要慌我们慢慢看 当然首先你要明白,线程进入await方法阻塞后,会用一个一个的节点将线程串起来 等达到条件后再一个一个的唤醒,该链表是一个双向链表
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //该函数用于将当前线程相关的节点将入链表尾部
boolean failed = true;
try {
for (;;) { //进入无限for循环
final Node p = node.predecessor(); //获得它的前节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { //唯一的退出条件,也就是await()方法返回的条件很重要!!
setHeadAndPropagate(node, r); //该方法很关键具体下面分析
p.next = null; // help GC
failed = false;
return; //到这里返回
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 先知道线程由该函数来阻塞的,具体有Usafe类的park方法实现阻塞当前线程
throw new InterruptedException();
}
} finally {
if (failed) //如果失败或出现异常,失败 取消该节点,以便唤醒后续节点
cancelAcquire(node);
}
}
我们先对addWaiter进行具体的剖析,看看线程如何变成节点被添加进去的 实现如下
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //首先new一个节点,该节点维持一个线程引用
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; //获取tail节点,tail是volatile型的
if (pred != null) { //不为空
node.prev = pred;
if (compareAndSetTail(pred, node)) { //利用CAS设置,允许失败,后面有补救措施
pred.next = node;
return node;
}
}
enq(node); //设置失败,表明是第一个创建节点,或者是已经被别的线程修改过了会进入这里
return node;
}
其实enq也没什么玄机,熟悉原子类实现原理的肯定可以看懂下面的操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 第一个创建时为节点为空
if (compareAndSetHead(new Node()))
tail = head; //初始化时 头尾节点相等
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //注意这里,只有设置成功才会退出,所以该节点一定会被添加
t.next = node;
return t;
}
}
}
}
好了到这里addWaiter就分析完成了,我们继续下面的分析工作,添加完节点之后,没有达到条件就会进入这个判断
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
让我们首先看看shouldParkAfterFailedAcquire(p, node)方法,首先明白p是node的前节点
//这里在剩余节点被唤醒时,若当前节点不是当前头节点,此方法还会被调用,只不过由于waitStatus>0,返回false
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前节点的状态
if (ws == Node.SIGNAL) //表明前节点可以运行
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //如果前节点状态大于0表明已经中断,
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev; //链表的基本操作,相信你可以看懂
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //等于0进入这里
}
return false; 只有前节点状态为NodeSIGNAL才返回真
}
我们对shouldParkAfterFailedAcquire来进行一个整体的概述,首先应该明白节点的状态,节点的状态是为了表明当前线程的良好度,如果当前线程被打断了,在唤醒的过程中是不是应该忽略该线程,这个状态标志就是用来做这个的具体有如下几种
/** waitStatus value to indicate thread has cancelled */ 线程已经被取消
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */ 线程需要去被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */ 线程正在唤醒等待条件
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should //线程的共享锁应该被无条件传播
* unconditionally propagate
*/
static final int PROPAGATE = -3;
目前你只需知道大于0时表明该线程已近被取消,已近是无效节点,不应该被唤醒,注意:初始化链头节点时头节点状态值为0。
shouldParkAfterFailedAcquire是位于无限for循环内的,这一点需要注意一般每个节点都会经历两次循环后然后被阻塞。建议读者试着走一遍,以加深理解 ,当该函数返回true时,线程调用parkAndCheckInterrupt这个阻塞自身。到这里基本每个调用await函数都阻塞在这里 (很关键哦,应为下次唤醒,从这里开始执行哦)
接着让我们来看看countDown这个函数的玄机吧,应为线程就是通过这个来函数来触发唤醒条件的
public void countDown() {
sync.releaseShared(1);
}
该函数也是委托其内部类完成,具体实现如下 这里arg为1 哦
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
看一下判断条件tryReleaseShared函数
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) { //自旋减一
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; //nextc 为零时才返回真
}
}
也就是说当state减1后为0时才会返回为真 执行后面的唤醒条件,否则都一概忽略,假设达到唤醒条件 具体来看如何唤醒 ,函数如下
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head; //获取头节点,
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //头结点的状态为Node.SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //这里唤醒 很重要哦
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;// loop on failed CAS
}
if (h == head) // loop if head changed
break; //这里是否有疑问明明都有这个 Node h = head为啥还要在判断一次?多次一举别着急后面有原因
}
}
//这里执行唤醒添加
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) { //s节点是否为空或者已经被取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //唤醒线程
}
对于这个唤醒操作很好理解的,首先取该节点的后节点就行唤醒,如果后节点已被取消,则从最后一个开始往前找,找一个满足添加的节点进行唤醒。
有人肯能会有疑问,要是如果有多个节点只在这进行一次唤醒工作吗?难道只唤醒一个线程就可以了?哈哈别急还记得线程是在哪阻塞的吗 让我们回来前面去看线程被阻塞的地方 (忘记了可以往前看看)
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);//关键就在这个函数哦
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //我们知道线程是在这里被阻塞的
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
关键来了,线程在这里被阻塞,唤醒后继续执行,由于满足条件,state的状态值为0,函数返回值为1 ,大于0会进入其中我们继续往下看 这一小段
setHeadAndPropagate(node, r);
具体实现如下
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //这里重新设置头节点 (与上面第一次释放锁 h== head 的重复判断相对应)
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); //注意这里 会进入这里
}
}
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
这个函数相信你不陌生吧,就是第一个释放锁所调用的,在这里,被唤醒的线程在调用一次,依赖唤醒后续线程
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; //明白这里为什么要加一次判断了吧!!!被唤醒的线程会在执行该函数
}
}
现在明白其唤醒机制了吧 先唤醒一个线程(第一个阻塞的线程) 然后被唤醒的线程又会执行到这里唤醒线程,如此重复下去 最终所有线程都会被唤醒,其实这也是AQS共享锁的唤醒原理,自此完成了对CountDownLatch阻塞和唤醒原理的基本分析
转载:https://blog.csdn.net/qq_32459653/article/details/81486757
转载:https://www.jianshu.com/p/446e9112f302