Java多线程系列

多线程并发框架使用三

2018-03-20  本文已影响0人  丹青水
Phaser

Phaser是JDK 1.7引入的。Phaser的功能与CountDownLatch和CyclicBarrier有部分重叠,同时也提供了更丰富的语义和更灵活的用法。Phaser比较适合这样场景,一种任务可以分为多个阶段,多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个CyclicBarrier来实现,每个CyclicBarrier负责等待一个阶段的任务全部完成。但是使用CyclicBarrier的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而Phaser可同时解决这两个问题。Phaser内有2个重要状态,分别是phase和party。

例子(运动员比赛)

运动员需要比赛,分4个阶段,每个阶段要等待其他运动员比赛完成。

public class PhaserDemo  extends Phaser{
    @Override
    protected boolean onAdvance(int phase, int registeredParties) { //在每个阶段执行完成后回调的方法
        switch (phase) {
            case 0:
                return ready();
            case 1:
                return one();
            case 2:
                return second();
            case 3:
                return three();
            case 4:
                return foor();
            default:
                return true;
        }

    }

    private boolean ready(){
        System.out.println("运动员准备好了,参赛人数:"+getRegisteredParties());
        return false;
    }
    private boolean one(){
        System.out.println("第一跑道跑完");
        return false;
    }
    private boolean second(){
        System.out.println("第二跑道跑完");
        return false;
    }

    private boolean three(){
        System.out.println("第三题跑道跑完");
        return false;
    }

    private boolean foor(){
        System.out.println("第四题跑道跑完,结束!");
        return true;
    }


}
class RunnerTask implements Runnable {
    private Phaser phaser;
    public RunnerTask(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"到达比赛");
        phaser.arriveAndAwaitAdvance();
        runWay("1", phaser);
        runWay("2",phaser);
        runWay("3",phaser);
        runWay("4",phaser);

    }
    private void  runWay(String channel,Phaser phaser ){
        System.out.println(Thread.currentThread().getName() + "跑道"+channel+"比赛中...");
        long duration = (long)(Math.random()*10);
        try {
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "跑道" + channel + "比赛完成!"+"用时:"+duration);
        phaser.arriveAndAwaitAdvance();
    }
}
public class PhaserDemoTest {
           public  static void main(String [] args){
               PhaserDemo phaser = new PhaserDemo();
               RunnerTask[] runnerTasks = new RunnerTask[5];
               for (int i = 0; i < runnerTasks.length; i++) {
                    runnerTasks[i] = new RunnerTask(phaser);
               }
               phaser.bulkRegister(runnerTasks.length);  //注册phaser要维护的线程数,单个可使用  phaser.register();
               Thread[] threads = new Thread[runnerTasks.length];
               for (int i = 0; i < runnerTasks.length; i++) {
                   threads[i] = new Thread(runnerTasks[i], "runner "+i);
                   threads[i].start();
               }
               //等待所有线程执行结束
               for (int i = 0; i < runnerTasks.length; i++) {
                   try {
                       threads[i].join();
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }

               System.out.println("phaser flag:"+phaser.isTerminated());

           }
}

上一篇 下一篇

猜你喜欢

热点阅读