并发十二:CountDownLatch、CyclicBarrie
J.U.C中提供了三个同步工具CountDownLatch、CyclicBarrier、Semaphore,都是共享锁的特殊应用,用来进行线程的任务协调。
CountDownLatch
一个小栗子:
public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":二级表生成");
Thread.sleep(10000);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":二级表生成");
Thread.sleep(10000);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread() {
public void run() {
try {
System.out.println("等待二级表生成完成");
latch.await();
System.out.println(Thread.currentThread().getName() + ":汇总统计");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
}
}
输出:"Thread-0:二级表生成、Thread-1:二级表生成、等待二级表生成完成",然后开始等待,直到Thread-0、Thread-1执行完成,然后"Thread-2:汇总统计"。
CountDownLatch是一个倒计时式的计数器,允许线程等待其他N个线程先执行完毕,再开始执行。
CountDownLatch基于AQS,是一个共享锁,await()使当前线程阻塞等待,countDown()计数器递减。
// CountDownLatch aqs源码:
‘’private static final class Sync
extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//加锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//解锁
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
count是倒计时的初始数。
await()调用tryAcquireShared(1)方法获取锁,根据共享锁的实现返回值小于0时线程会被阻塞等待,也就是只有当state==0,才能成功获取锁。
countDown()调用tryReleaseShared(1)方法进行解锁,当state值为0时,共享锁才算完全释放,会唤醒队列里等待的线程。
CountDownLatch没有复位操作,当state的值为0时再调用await()就不会阻塞线程了,所以CountDownLatch只能使用一次。
CyclicBarrier
一个小栗子:
public class CyclicBarrierTest {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(3);
new Thread() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":计算完成");
barrier.await();
System.out.println(Thread.currentThread().getName() + ":入库");
} catch (Exception e) {
e.printStackTrace();
}
};
}.start();
new Thread() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":计算完成");
barrier.await();
System.out.println(Thread.currentThread().getName() + ":入库");
} catch (Exception e) {
e.printStackTrace();
}
};
}.start();
try {
Thread.sleep(10000);// 等待
} catch (InterruptedException e1) {
e1.printStackTrace();
}
new Thread() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":计算完成");
barrier.await();
System.out.println(Thread.currentThread().getName() + ":入库");
} catch (Exception e) {
e.printStackTrace();
}
};
}.start();
}
}
输出结果:"Thread-0:计算完成、Thread-1:计算完成",等待,直到"Thread-2:计算完成",然后"Thread-0:入库、Thread-1:入库、Thread-2:入库"。
CyclicBarrier是可循环的同步屏障,将N个线程进行阻塞,直到阻塞线程的数量到达屏障点时屏障被打破,这N个线程才会继续执行。
CyclicBarrier使用一个重入锁实现,初始化时传入屏障点parties,即要阻塞的线程数量,还可以传入一个Runnable的实现barrierAction,它会在屏障打破时执行。在屏障未打破前调用await()方法的线程都会被阻塞。
public class CyclicBarrier {
/** 屏障状态 */
private static class Generation {
boolean broken = false;
}
/** 重入锁 */
private final ReentrantLock lock = new ReentrantLock();
/** condition */
private final Condition trip = lock.newCondition();
/** 屏障点 */
private final int parties;
/** 到达屏障点执行的类 */
private final Runnable barrierCommand;
/** 当前状态 */
private Generation generation = new Generation();
/** 计数 */
private int count;
... ...
}
阻塞放行流程:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//当前状态
final Generation g = generation;
if (g.broken)//被打破 s1
throw new BrokenBarrierException();
//中断
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//计数
int index = --count;
//到达屏障点 s3
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//先执行barrierCommand
if (command != null)
command.run();
ranAction = true;
//唤醒所以阻塞线程
//重新实例化generation
//复位操作
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {// s2
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation) {
System.out.println("退出");
return index;
}
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
s1: g.broken==true说明屏障被打破,这时再调用await()会抛出异常。
如果线程中断则打破屏障并抛出异常,
计数器count递减并赋给index,index==0进入s2,否则进入s3
s2:index==0说明被拦截的线程数量已经达到屏障点,如果barrierAction不为空,则直接调用run方法,让其先执行。
nextGeneration()会唤醒所有在trip上等待的线程、重新赋值count为初始值parties,new 一个Generation赋给generation,这样一来CyclicBarrier就恢复如初了,可以被重新使用,返回0。
s3: count>0,说明还没有到达屏障点,进入for(;;)循环体,会让线程在条件队列trip上等待,直到屏障被打破。屏障打破时会重新赋值generation,被唤醒的线程会在(g != generation)这个点正常退出循环。
CyclicBarrier屏障正常打破后进行了复位操作,所以CyclicBarrier可以重复使用。
Semaphore
一个小栗子:
public class SemaphoreTest {
private static final int tokenCount = 3;
public static void main(String[] args) {
final Semaphore tokens = new Semaphore(tokenCount); // 令牌发放者
for(int i=0;i<10;i++)
new Request(tokens).start();
}
static class Request extends Thread {
private Semaphore tokens;
public Request(Semaphore tokens) {
this.tokens = tokens;
}
@Override
public void run() {
try {
tokens.acquire();// 申请访问令牌
System.out.println(Thread.currentThread().getName()+":访问资源...");
Thread.sleep(3000);
tokens.release();// 访问完毕归还令牌
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
输出: "Thread-0:访问资源、Thread-1:访问资源、Thread-2:访问资源",等待,然后"Thread-3:访问资源、Thread-5:访问资源、Thread-4:访问资源",不同的电脑顺序可能不一样,能发现都是3个组的访问。
Semaphore信号,可以限定访问受限资源的线程数量,用来协调访问资源的线程数量,使其处在一个恒定的值。
网络应用中为了保护服务器不被流量洪峰冲夸,会进行限流,限流会使用令牌桶算法,Semaphore就可以实现令牌桶:访问线程先拿到令牌才能访问,访问完后把令牌归还到桶中以便供其他线程使用,就保证了访问资源的线程数量和令牌数量一至。
Semaphore是一个共享锁,内部代码布局和ReentrantLock类似,支持公平性设置,如果设置为公平锁,能够使等待最久的线程先获取信号,默认为非公平性的。
public class Semaphore {
/** 同步器实例 */
private final Sync sync;
/** 父类同步器*/
abstract static class Sync extends AbstractQueuedSynchronizer {... ...}
/** 非公平同步器*/
static final class NonfairSync extends Sync {}
/** 公平同步器*/
static final class FairSync extends Sync {}
/** 构造*/
public MySemaphore(int permits) {
sync = new NonfairSync(permits);
}
/** 构造*/
public MySemaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
... ...
}
加锁解锁流程和前面的写的共享锁一致。
不同的是信号获取的逻辑中没有对重入的处理,一个线程可以多次获取信号,每次获取都会让总量减1。
信号的释放时没有对信号的总量进行控制,比如初始的信号是5个,已经获取了5个,释放来了7个,这时可用的信号是7个,也就是说在释放的可以对信号数量进行扩容,如果在使用中需要保持信号数量恒定,一定要保证acquire和release成对出现。
小结
- CountDownLatch和CyclicBarrier都是以计数器的的形式来协调线程同步的,一个显著的区别是CyclicBarrier可重用,CountDownLatch是一次性的。
- CountDownLatch和CyclicBarrier还有一个语义层面上的区别是,Count DownLatch是线程等待另外N个线程执行完毕。CyclicBarrier是N个线程相互等待,直到都执行毕。
- CountDownLatch强调依赖,CyclicBarrier强调协作。典型的应用场景就是大任务拆解为小任务,然后合并计算结果,比如多线程下载大文件,多个下载线程将自己分配的文件段下载完毕后,合并线程才开始进行文件合并操作。
- Semaphore用来限定访问受限资源的线程数量,典型的应用场景是流量控制,比如并发操作数据库,数据库连接池只有10个,必须保证只能有10个线程去获取连接,否则会报错。
- Semaphore可以进行简单的服务端限流,比如一个RPC服务器只能支撑200QPS,就可以用Semaphore去限制请求RPC的线程数量。当然对于复杂的服务端限流还得使用更高效令牌桶(Token Bucket)或者漏桶(Leaky Bucket)算法。
码字不易,转载请保留原文连接https://www.jianshu.com/p/9e0ecc8b1358