多线程并发框架使用三
2018-03-20 本文已影响0人
丹青水
Phaser
Phaser是JDK 1.7引入的。Phaser的功能与CountDownLatch和CyclicBarrier有部分重叠,同时也提供了更丰富的语义和更灵活的用法。Phaser比较适合这样场景,一种任务可以分为多个阶段,多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个CyclicBarrier来实现,每个CyclicBarrier负责等待一个阶段的任务全部完成。但是使用CyclicBarrier的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而Phaser可同时解决这两个问题。Phaser内有2个重要状态,分别是phase和party。
- phase就是阶段,初值为0,当所有的线程执行完本轮任务,同时开始下一轮任务时, 意味着当前阶段已结束,进入到下一阶段,phase的值自动加1。
- party就是线程, 表示当前Phaser对象当前管理着线程个数。
- boolean onAdvance(int phase, int registeredParties)方法。当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行, 当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。
- arriveAndAwaitAdvance() 当前线程当前阶段执行完毕,等待其它线程完成当前阶段。如果当前线程是该阶段最后一个未到达的,则该方法直接返回下一个阶段的序号(阶段序号从0开始),同时其它线程的该方法也返回下一个阶段的序号。
- arriveAndDeregister() 该方法立即返回下一阶段的序号,并且其它线程需要等待的个数减一,并且把当前线程从之后需要等待的成员中移除。如果该Phaser是另外一个Phaser的子Phaser(层次化Phaser会在后文中讲到),并且该操作导致当前Phaser的成员数为0,则该操作也会将当前Phaser从其父Phaser中移除。
- arrive() 该方法不作任何等待,直接返回下一阶段的序号。
- awaitAdvance(int phase) 该方法等待某一阶段执行完毕。如果当前阶段不等于指定的阶段或者该Phaser已经被终止,则立即返回。该阶段数一般由arrive()方法或者arriveAndDeregister()方法返回。返回下一阶段的序号,或者返回参数指定的值(如果该参数为负数),或者直接返回当前阶段序号(如果当前Phaser已经被终止)。
- awaitAdvanceInterruptibly(int phase) 效果与awaitAdvance(int phase)相当,唯一的不同在于若该线程在该方法等待时被中断,则该方法抛出InterruptedException。
- awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果与awaitAdvanceInterruptibly(int phase)相当,区别在于如果超时则抛出TimeoutException。
- bulkRegister(int parties) 注册多个party。如果当前phaser已经被终止,则该方法无效,并返回负数。如果调用该方法时,onAdvance方法正在执行,则该方法等待其执行完毕。如果该Phaser有父Phaser则指定的party数大于0,且之前该Phaser的party数为0,那么该Phaser会被注册到其父Phaser中。
- forceTermination() 强制让该Phaser进入终止状态。已经注册的party数不受影响。如果该Phaser有子Phaser,则其所有的子Phaser均进入终止状态。如果该Phaser已经处于终止状态,该方法调用不造成任何影响。
例子(运动员比赛)
运动员需要比赛,分4个阶段,每个阶段要等待其他运动员比赛完成。
public class PhaserDemo extends Phaser{
@Override
protected boolean onAdvance(int phase, int registeredParties) { //在每个阶段执行完成后回调的方法
switch (phase) {
case 0:
return ready();
case 1:
return one();
case 2:
return second();
case 3:
return three();
case 4:
return foor();
default:
return true;
}
}
private boolean ready(){
System.out.println("运动员准备好了,参赛人数:"+getRegisteredParties());
return false;
}
private boolean one(){
System.out.println("第一跑道跑完");
return false;
}
private boolean second(){
System.out.println("第二跑道跑完");
return false;
}
private boolean three(){
System.out.println("第三题跑道跑完");
return false;
}
private boolean foor(){
System.out.println("第四题跑道跑完,结束!");
return true;
}
}
class RunnerTask implements Runnable {
private Phaser phaser;
public RunnerTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"到达比赛");
phaser.arriveAndAwaitAdvance();
runWay("1", phaser);
runWay("2",phaser);
runWay("3",phaser);
runWay("4",phaser);
}
private void runWay(String channel,Phaser phaser ){
System.out.println(Thread.currentThread().getName() + "跑道"+channel+"比赛中...");
long duration = (long)(Math.random()*10);
try {
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "跑道" + channel + "比赛完成!"+"用时:"+duration);
phaser.arriveAndAwaitAdvance();
}
}
public class PhaserDemoTest {
public static void main(String [] args){
PhaserDemo phaser = new PhaserDemo();
RunnerTask[] runnerTasks = new RunnerTask[5];
for (int i = 0; i < runnerTasks.length; i++) {
runnerTasks[i] = new RunnerTask(phaser);
}
phaser.bulkRegister(runnerTasks.length); //注册phaser要维护的线程数,单个可使用 phaser.register();
Thread[] threads = new Thread[runnerTasks.length];
for (int i = 0; i < runnerTasks.length; i++) {
threads[i] = new Thread(runnerTasks[i], "runner "+i);
threads[i].start();
}
//等待所有线程执行结束
for (int i = 0; i < runnerTasks.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("phaser flag:"+phaser.isTerminated());
}
}