Java 多线程Latch模式-对比IOS 的线程依赖

2020-04-29  本文已影响0人  劉胡來
  • 有A、B、C、D若干个并行任务,现在F任务需要等ABCD全部完成之后再进行,只要其中任一一个并发任务未执行完F任务就阻塞或者抛出超时异常、取消任务
public abstract class Latch {

    protected int limit;

    public Latch(int limit){
        this.limit = limit;
    }

    /**
     * 阻塞当前调用者所在线程,阻塞的逻辑为,如果当前还有任务未完成则阻塞
     *
     * @throws InterruptedException
     */
    public abstract void await() throws InterruptedException;

    public abstract void await(TimeUnit unit,long time) throws InterruptedException, TimeoutException;

    /**
     * 谁执行完任务就将任务完成标志减1,当任务完成标志为0时表示所有任务均已完成
     * 本方法为同步方法,任务线程执行时需要先获取到本接口的锁,具体锁住的对象为
     * limit ,当前任务线程执行完任务之后将标志-1,同时释放锁
     */
    public abstract void countDown();

    /**
     * 获取剩下未完成任务的个数
     * @return
     */
    public abstract int getUnarrived();
}

具体任务实现类:

public class CountDownLatch extends Latch {

    public CountDownLatch(int limit) {
        super(limit);
    }

    @Override
    public void await() throws InterruptedException {

        synchronized (this){
            while(limit > 0){
                this.wait();
            }
        }
    }

    @Override
    public void await(TimeUnit unit, long time) throws InterruptedException, TimeoutException {
        if(time <=0){
            throw new IllegalArgumentException("the time is invalid");
        }

        //将时间转换为纳秒
        long remainingNanos = unit.toNanos(time);
        //等待任务将在endNanos 纳秒后 超时
        final long endNanos = System.nanoTime() + remainingNanos;

        synchronized (this){
            while(limit > 0){
                //超时  直接抛出异常
                if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0){
                    throw  new TimeoutException("time out");
                }

                //等待remainingNanos  在等待的过程中可能会被中断,需要重新计算remainingNanos时间
                this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));

                //执行线程中断  时重新计算时间
                remainingNanos = endNanos - System.nanoTime();
            }
        }
    }


    @Override
    public void countDown() {

        synchronized (this){
            if(limit <= 0){
                throw new IllegalStateException("all of task has done");
            }

            limit --;
            notifyAll();
        }
    }

    @Override
    public int getUnarrived() {
        return limit;
    }
}

工作线程:

public class LatchTaskThread extends Thread {

    private Latch latch;

    private String programmer;

    private String transportion;

    public LatchTaskThread(Latch latch,String programmer,String transportion){
        this.latch = latch;
        this.programmer = programmer;
        this.transportion = transportion;
    }

    @Override
    public void run() {
        super.run();
        System.out.println("26--------执行者:"+this.programmer + "  start task:"+transportion);
        try {
            TimeUnit.SECONDS.sleep(new Random().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("26--------执行者:"+this.programmer + "  finsh task:"+transportion);
        latch.countDown();

    }
}

 private void testLatch() throws InterruptedException, TimeoutException {
        latch = new CountDownLatch(4);
        new LatchTaskThread(latch,"A","Bus").start();
        new LatchTaskThread(latch,"B","Stock").start();
        new LatchTaskThread(latch,"C","Play Crad").start();
        new LatchTaskThread(latch,"D","Work").start();
        //latch.await();

        latch.await(TimeUnit.SECONDS,5);
        System.out.println("43-------所有任务均已经完成");
    }

上一篇下一篇

猜你喜欢

热点阅读