多线程Future设计模式的两种用法
2020-01-17 本文已影响0人
herohua
Future接口定义:
public interface Future<T> {
T get() throws InterruptedException;
}
Future接口的实现类:
public class AsynFuture<T> implements Future<T> {
private volatile boolean done = false;
private T result;
public void done(T result) {
synchronized (this) {
this.result = result; // 更新结果
done = true; // 更新业务线程是否运行结束标志
this.notifyAll(); // 唤醒get方法阻塞的线程
}
}
@Override
public T get() throws InterruptedException {
synchronized (this) {
// 业务线程还没有执行完成,则调用方在此阻塞,直到业务线程执行完毕
while (!done) {
wait();
}
}
// 业务线程执行完毕,返回执行结果
return result;
}
}
封装任务的FutureTask接口:
public interface FutureTask<T> {
T call();
}
1. 主动轮询方式
FutureService提供submit方法,用于给客户端调用方传入需要执行的任务FutureTask,执行完任务之后,更新Future对象 ,客户端通过Future的get方法获取业务运行结果,在get方法方法里通过while循环不断轮询业务线程是否执行完毕,如果执行完毕,则将运行结果返回。这种方式客户端调用方的的主线程会阻塞在future.get()方法这里。
public class FutureService {
public <T> Future<T> submit(final FutureTask<T> task) {
AsynFuture<T> future = new AsynFuture<>();
new Thread(() -> {
// 新建一个线程执行业务逻辑,得到执行结果
// call方法的实现根据实际业务编写
T result = task.call();
// 执行完之后更新future
future.done(result);
}).start();
// 直接返回future对象
return future;
}
}
客户端调用方:
public class SyncInvoker {
public static void main(String[] args) throws InterruptedException {
FutureService futureService = new FutureService();
// 客户端通过futureService提交一个任务
// call方法封装自己的代码逻辑,最后返回最终结果
Future<String> future = futureService.submit(() -> {
// 模拟业务运行
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模拟返回业务结果
return "FINISH";
});
System.out.println("================");
System.out.println("do other thing");
System.out.println("================");
Thread.sleep(1_000);
// 从future中获取call方法的返回值
// 主线程在此阻塞
System.out.println(future.get());
}
}
2. 被动回调方式
FutureService提供的submit方法,需要传入任务FutureTask和对任务执行结果的处理逻辑,Consumer是一个函数式接口,需要重写accept方法,封装调用方对任务执行结果将会以何种方式进行处理。这样,调用方不再需要显式的通过Future.get()方法去请求任务执行结果,而是将执行结果回调的方式处理。
public class FutureService2 {
public <T> Future<T> submit(final FutureTask<T> task, final Consumer<T> consumer) {
AsynFuture<T> future = new AsynFuture<>();
new Thread(() -> {
// 新建一个线程执行业务逻辑,得到执行结果
// call方法的实现根据实际业务编写
T result = task.call();
// 执行完之后更新future
future.done(result);
// 对执行结果进行处理,具体处理逻辑有调用方实现
consumer.accept(result);
}).start();
// 直接返回future对象
return future;
}
}
客户端调用方:
public class SyncInvoker2 {
public static void main(String[] args) throws InterruptedException {
FutureService2 futureService2 = new FutureService2();
// 客户端通过futureService提交一个任务
// call方法封装自己的代码逻辑,最后返回最终结果
Future<String> future = futureService2.submit(() -> {
// 模拟业务运行
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模拟返回业务结果
return "FINISH";
}, System.out::println); // System.out::println为函数接口Consumer的accept方法的实现,即拿到结果之后直接打印结果,
// 不需要future.get()方法主动轮询结果,导致主线程阻塞
System.out.println("================");
System.out.println("do other thing");
System.out.println("================");
Thread.sleep(1_000);
}
}