Java 多线程Latch模式-对比IOS 的线程依赖
2020-04-29 本文已影响0人
劉胡來
- Latch 模式背景释义:
- 有A、B、C、D若干个并行任务,现在F任务需要等ABCD全部完成之后再进行,只要其中任一一个并发任务未执行完F任务就阻塞或者抛出超时异常、取消任务
- 代码翻译:
抽象任务接口约束类
public abstract class Latch {
protected int limit;
public Latch(int limit){
this.limit = limit;
}
/**
* 阻塞当前调用者所在线程,阻塞的逻辑为,如果当前还有任务未完成则阻塞
*
* @throws InterruptedException
*/
public abstract void await() throws InterruptedException;
public abstract void await(TimeUnit unit,long time) throws InterruptedException, TimeoutException;
/**
* 谁执行完任务就将任务完成标志减1,当任务完成标志为0时表示所有任务均已完成
* 本方法为同步方法,任务线程执行时需要先获取到本接口的锁,具体锁住的对象为
* limit ,当前任务线程执行完任务之后将标志-1,同时释放锁
*/
public abstract void countDown();
/**
* 获取剩下未完成任务的个数
* @return
*/
public abstract int getUnarrived();
}
具体任务实现类:
public class CountDownLatch extends Latch {
public CountDownLatch(int limit) {
super(limit);
}
@Override
public void await() throws InterruptedException {
synchronized (this){
while(limit > 0){
this.wait();
}
}
}
@Override
public void await(TimeUnit unit, long time) throws InterruptedException, TimeoutException {
if(time <=0){
throw new IllegalArgumentException("the time is invalid");
}
//将时间转换为纳秒
long remainingNanos = unit.toNanos(time);
//等待任务将在endNanos 纳秒后 超时
final long endNanos = System.nanoTime() + remainingNanos;
synchronized (this){
while(limit > 0){
//超时 直接抛出异常
if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0){
throw new TimeoutException("time out");
}
//等待remainingNanos 在等待的过程中可能会被中断,需要重新计算remainingNanos时间
this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
//执行线程中断 时重新计算时间
remainingNanos = endNanos - System.nanoTime();
}
}
}
@Override
public void countDown() {
synchronized (this){
if(limit <= 0){
throw new IllegalStateException("all of task has done");
}
limit --;
notifyAll();
}
}
@Override
public int getUnarrived() {
return limit;
}
}
工作线程:
public class LatchTaskThread extends Thread {
private Latch latch;
private String programmer;
private String transportion;
public LatchTaskThread(Latch latch,String programmer,String transportion){
this.latch = latch;
this.programmer = programmer;
this.transportion = transportion;
}
@Override
public void run() {
super.run();
System.out.println("26--------执行者:"+this.programmer + " start task:"+transportion);
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("26--------执行者:"+this.programmer + " finsh task:"+transportion);
latch.countDown();
}
}
- 测试用例:
private void testLatch() throws InterruptedException, TimeoutException {
latch = new CountDownLatch(4);
new LatchTaskThread(latch,"A","Bus").start();
new LatchTaskThread(latch,"B","Stock").start();
new LatchTaskThread(latch,"C","Play Crad").start();
new LatchTaskThread(latch,"D","Work").start();
//latch.await();
latch.await(TimeUnit.SECONDS,5);
System.out.println("43-------所有任务均已经完成");
}
- 参考书籍《Java 高并发编程详解》