并发工具类-CyclicBarrier(基于Condtion)
1.使用示例
public class UseCyclicBarrier {
private static CyclicBarrier barrier
= new CyclicBarrier(5,new CollectThread());
private static ConcurrentHashMap<String,Long> resultMap
= new ConcurrentHashMap<>();//存放子线程工作结果的容器
//负责屏障开放以后的工作
private static class CollectThread implements Runnable{
@Override
public void run() {
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result);
System.out.println("do other business........");
}
}
//工作线程
private static class SubThread implements Runnable{
@Override
public void run() {
long id = Thread.currentThread().getId();//线程本身的处理结果
resultMap.put(Thread.currentThread().getId()+"",id);
Random r = new Random();//随机决定工作线程的是否睡眠
try {
if(r.nextBoolean()) {
Thread.sleep(2000+id);
System.out.println("Thread_"+id+" ....do something ");
}
System.out.println(id+"....is await");
barrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do its business ");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for(int i=0;i< 5;i++){
Thread thread = new Thread(new SubThread());
thread.start();
}
}
}
结果:
16....is await
13....is await
17....is await
15....is await
Thread_14 ....do something
14....is await
the result = [13][14][15][16][17]
do other business........
Thread_13 ....do its business
Thread_14 ....do its business
Thread_15 ....do its business
Thread_16 ....do its business
Thread_17 ....do its business
2.官方文档
A synchronization aid that allows a set of threads to all wait for
each other to reach a common barrier point. CyclicBarriers are
useful in programs involving a fixed sized party of threads that
must occasionally wait for each other. The barrier is called cyclic
because it can be re-used after the waiting threads are released.
A CyclicBarrier supports an optional Runnable command that is run
once per barrier point, after the last thread in the party arrives, but
before any threads are released. This barrier action is useful for
updating shared-state before any of the parties continue.
一种同步辅助工具,允许一组线程等待彼此全部到达公共的障栅点。之所以称为Cycli,是因为在线程释放后可以重新使用。
支持可选的Runnable,该程序在每个障栅点运行一次,运行时刻是所有线程到达该障栅点之后,任何线程释放之前。该功能对于各线程继续运行前更新共享的状态很有用。
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads)
thread.join();
}
}
这里每一个worker线程都处理矩阵的一行并且在障栅处等待所有行都处理完成。当所有行都处理完之后,barrierAction开始执行合并所有行的操作。当合并操作发现了一个解,则done()会返回true,并且所有worker线程会终止。
如果barrierAction不依赖于所有线程到达障栅点暂停,那么任何线程都可以在释放时候执行该操作。为了实现这一点,await调用在障栅点会返回到达的线程索引。然后,就可以选择执行barrierAction的线程。
The CyclicBarrier uses an all-or-none breakage model for failed
synchronization attempts: If a thread leaves a barrier point
prematurely because of interruption, failure, or timeout, all other
threads waiting at that barrier point will also leave abnormally via
BrokenBarrierException (or InterruptedException if they too were
interrupted at about the same time).
使用要么全部成功要么全部失败的模型:如果一个线程因为中断、失败或超时而过早地离开障栅点,则在该障栅点等待的所有其他线程也将抛出BrokenBarrierException异常而离开(或InterruptedException在大约同一时间都被中断的情况)。
3.构造器和相关域
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
parties为障栅点等待方的相关数量,count表示还有多少线程等待到达障栅点。
4.await
Waits until all parties have invoked await on this barrier.
If the current thread is not the last to arrive then it is disabled for
thread scheduling purposes and lies dormant until one of the
following things happens:
The last thread arrives; or
Some other thread interrupts the current thread; or
Some other thread interrupts one of the other waiting threads; or
Some other thread times out while waiting for barrier; or
Some other thread invokes reset() on this barrier.
If the current thread:
has its interrupted status set on entry to this method; or
is interrupted while waiting
then InterruptedException is thrown and the current thread's
interrupted status is cleared.
If the barrier is reset() while any thread is waiting, or if the barrier is
broken when await is invoked, or while any thread is waiting, then
BrokenBarrierException is thrown.
If any thread is interrupted while waiting, then all other waiting
threads will throw BrokenBarrierException and the barrier is placed
in the broken state.
If the current thread is the last thread to arrive, and a non-null
barrier action was supplied in the constructor, then the current
thread runs the action before allowing the other threads to
continue. If an exception occurs during the barrier action then that
exception will be propagated in the current thread and the barrier is
placed in the broken state.
Returns:
the arrival index of the current thread, where index getParties() - 1
indicates the first to arrive and zero indicates the last to arrive
Throws:
InterruptedException - if the current thread was interrupted while
waiting
BrokenBarrierException - if another thread was interrupted or timed
out while the current thread was waiting, or the barrier was reset,
or the barrier was broken when await was called, or the barrier
action (if present) failed due to an exception
等待各方在此barrier上调用await。
如果当前线程不是最后到达的线程,那么其将被禁止进行线程调度。并在发生以下任何一种情况之前处于休眠状态:
- 最后一个线程到达
- 某个线程中断了当前线程
- 某个线程中断了其他某个等待的线程
- 某个线程等待超时
- 某个线程在此barrier上调用了reset()
如果此线程:
- 在该方法开始的时候设置了中断状态
- 或者在等待时中断了
那么会抛出InterruptedException并清除当前线程的中断状态。
如果线性在等待时调用了reset()或者barrier被破坏了,则会抛出BrokenBarrierException。
如果任何线程在等待时被中断,那么所有其他线程会抛出BrokenBarrierException并且barrier被置位broken状态。
如果当前线程是最后到达的线程,会在所有其他线程继续运行前执行非空的barrier action。如果执行过程中发生了异常,那么该异常会在当前线程中传播,并且barrier处于broken状态。
返回值:当前线程的到达索引,getParties() - 1表示第一个到达的线程,0表示最后到达的线程。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The current generation */
private Generation generation = new Generation();
lock用于保护barrier入口,trip用于等待处理,这里实现等待的方式是通过lock的条件变量trip.await实现的。
当最后一个线程执行完barrierCommand之后,会调用nextGeneration() ,该方法会调用trip.signalAll()唤醒所有等待在此条件上的线程。
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
等待线程唤醒后,会进行检查,正常情况会从该处返回:
if (g != generation)
return index;
因为nextGeneration()中 generation = new Generation(),所以每一个线程的await会返回其相应的index。getParties() - 1表示第一个到达的线程,0表示最后到达的线程。