线程阻塞(一),CountDownLatch、CyclicBar
遇到一个笔试题:5个线程内部打印hello和word,hello在前,要求提供一种方法使得5个线程先全部打印出hello后再打印5个word
首先想到的是CountDownLatch、CyclicBarrier这类线程阻塞工具,趁此机会,总结下这几个工具的用法吧。
1、CountDownLatch
先看下源码吧,这是构造方法
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {//count为计数值
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
还有几个主要的方法
public void await() throws InterruptedException {}; //调用await()方法后线程会被挂起,等待直到count值为0才继续执行,除非线程Intercept
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {}; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() {}; //将count值减1
先写一个简单的例子,比如5个线程都打印完HelloWorld 后,打印一条语句
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new PrintThread(latch, i).start();
}
try {
latch.await();//等待,直到此CountDownLatch里面的count=0
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("******** Print Over ***********");
}
static class PrintThread extends Thread {
CountDownLatch latch;
int id;
public PrintThread(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}
@Override
public void run() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Hello World " + id);
this.latch.countDown();//在做完工作后,countDown减1
}
}
结果如下
Hello World 0
Hello World 3
Hello World 2
Hello World 1
Hello World 4
******** Print Over ***********
工作线程每次执行完后,countDown()减1,await()挂起,先在那等着,直到count=0的时候,才继续向下执行。
好了,回到开始的题目,稍微改下就可以了
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new PrintThread(latch, i).start();
}
}
static class PrintThread extends Thread {
CountDownLatch latch;
int id;
public PrintThread(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}
@Override
public void run() {
System.out.println("Hello " + id);
this.latch.countDown();
try {
this.latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("World " + id);
}
}
结果如下
Hello 0
Hello 3
Hello 4
Hello 2
Hello 1
World 1
World 2
World 3
World 4
World 0
同一个CountDownLatch对象,在线程中,打印完第一条"Hello"语句后,countDown(),然后await()挂起,count=0的时候,再向下打印"World" 语句
2、CyclicBarrier
先看下构造方法
/**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
主要方法是
public int await() throws InterruptedException, BrokenBarrierException {};//挂起当前线程,直至所有线程都到达阻塞状态(即所有线程都执行到了await方法)再执行await后面的任务;
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {};//让这些线程等待至一定的时间,如果还有线程没有到达阻塞状态就直接让已经到达阻塞状态的线程执行后续任务
回到最开始的题目
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new PrintThread(cyclicBarrier, i ).start();
}
System.out.println("******** Print Main ***********");
}
static class PrintThread extends Thread {
CyclicBarrier cyclicBarrier;
int id;
public PrintThread(CyclicBarrier cyclicBarrier, int id) {
this.cyclicBarrier = cyclicBarrier;
this.id = id;
}
@Override
public void run() {
System.out.println("Hello " + id);
try {
this.cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("World " + id);
}
}
执行结果如下
Hello 0
******** Print Main ***********
Hello 3
Hello 2
Hello 1
Hello 4
World 0
World 2
World 3
World 4
World 1
在执行完打印"Hello"任务后,调用CyclicBarrier的await()方法,线程阻塞在这里,等所有线程都await()了,再执行以下的任务,打印"World"。
另外,CyclicBarrier第二个构造方法里有一个Runnable类型的参数,看下注释,
// 等阻塞释放了,再执行的指令
@param barrierAction the command to execute when the barrier is tripped
我们用另外一个构造方法试试
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("******** Print Main ***********");
}
});
执行结果如下
Hello 0
Hello 2
Hello 1
Hello 4
Hello 3
******** Print Main ***********
World 3
World 0
World 1
World 2
World 4
所有的"Hello" 打印完,再执行这个barrierAction参数,而且,需要注意的是,等barrierAction执行完毕后,才会再往下执行。
总结
CountDownLatch、CyclicBarrier都能实现线程阻塞,区别是在使用过程中CountDownLatch调用await()方法仅实现阻塞,对计数没有影响,线程执行完需要手动countDown()更新计数,而CyclicBarrier调用await()方法后,会统计是否全部await()了,如果没有全部await(),就阻塞,也就是说,CyclicBarrier的await()方法也参与计数了;另外一方面,CountDownLatch不能复用,CyclicBarrier可以在某个线程中调用它的reset()方法复用。
补充
CountDownLatch、CyclicBarrier和后面要讲的Semaphore、JDK 1.6版本的FutureTask都是基于AbstractQueuedSynchronizer机制实现同步的,源码解析可参考:http://brokendreams.iteye.com/blog/2250372