Java Future 模式
2020-04-25 本文已影响0人
劉胡來
- Future 模式释义 来源于《Java 高并发编程详解》
假设有个任务需要执行比较长的时间,通常需要等待任务执行结束或者出错
才能返回结果,在些期间调用者只能陷入阻塞苦苦等待,对此Future设计模式提供了一种凭据式的解决方案。在日常生活中凭据的使用很常见,比如你去服装店想订做一套西装,但又不想将宝贵的时间花在漫长的等待设计师制做西装的过程中,于是裁缝给你开了一张凭据,上面言道:
- 定做西装的单子
- 取西装的时间
『带着这两个问题来设计一个这样的Future模式』
假设这个裁缝店的师父是一个做大事且做事规范有序的人,他针对自己的业务类型设计了一个统一的凭据模版(通常用接口或者抽象类来约定),不同的业务有其不同的模版,但都遵循这样的格式。比如西装的有西装的模版,旗袍有旗袍的模版(具体的实现类)。这个模版定义了如下这个约定
- 给客户的服装
- 客户给了高价钱,必须在某个时间点内得到定制的服装,超过时间点就不要了,按违约处理。(超时取消任务)
- 客户临时改变主意不做了(取消定做服装的任务)
- 服装是否完成
这个模版用代码翻译如下:
package com.example.liuxiaobing.statemodel.future_model;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Create By 刘胡来
* Create Date 2020/4/23
* Des: 获取计算结果
*
* future 未来的意思,即将一个耗时操作交给一个线程去执行,从而达到异步的
* 目的,提交线程在提交任务时和获得的结果过程中可以进行其他的任务执行,而不至于
* 傻傻等待结果的返回
*/
public interface Future<T> {
/**
* 返回计算后的结果,该方法为调用者开放,当未获得他的结果时应该陷入阻塞状态
* 线程执行过程中阻塞状态有可能会被打断,所以此处我们主动抛出中断异常
* @return
* @throws InterruptedException
*/
T get() throws InterruptedException;
/**
* 带超时直接抛出超时异常的接口,当在规定的时间内未获得任务结果返回时,直接将任务取消
* 使用抛出超时异常来中断任务,调用者捕获到超时异常时做自己的逻辑处理
* 超时的逻辑判断:
* 在做任务的线程中判断任务的耗时时长,如果超过约定的时间则视为超时
* 在判断任务超时后还需要将调用者的阻塞状态唤醒,这一点很重要
* @param timeout 超时时间片
* @return
* @throws InterruptedException 中断异常
* @throws ExecutionException 执行异常
* @throws TimeoutException 超时异常
*/
T get(long timeout)
throws InterruptedException, ExecutionException, TimeoutException;
T get(TimeUnit unit,long timeout)throws InterruptedException, ExecutionException, TimeoutException;
/**
* 取消任务,逻辑判断依据为:
* 将取消任务的标志置为真,同时将调用者的阻塞状态唤醒
*/
void cancel(boolean cancel);
/**
* 判断任务取消状态
* @return
*/
boolean isCanceled();
/**
* 判断任务是否已经执行完
* @return
*/
boolean done();
}
注意每个接口的注释
- 具体执行任务的类,由任务线程进行,此处创建了一个线程来进行这个任务,如果为了高效也可以使用线程池的方式,
这个线程要处理这么几件事:
- 定做西装这件具体做的事
- 判断这个任务的耗时
- 监听客户的状态,如是否收到客户取消定制西装的任务,如果取消了应立即中断正在做西装的任务
- 西装完成了之后告诉通知客户已经做好了,可以过来取西装了,即将客户的阻塞状态唤醒。同时将自己的状态复原表示可以接收下一个任务
private Date startTime = null;
private class TaskThread extends Thread{
public Task<IN, OUT> task;
private IN input;
public FutureTask<OUT> future;
private boolean isCancel;
public TaskThread(Task<IN, OUT> task,final IN input,FutureTask<OUT> future){
this.task = task;
this.input = input;
this.future = future;
}
public void cancelTask(boolean cancel){
this.isCancel = cancel;
this.future.cancel(cancel);
}
@Override
public void run() {
super.run();
System.out.println("89-----------start work:");
startTime = new Date();
OUT result = task.get(input);
Date currentTime = new Date();
future.duration = currentTime.getTime() - startTime.getTime();
future.finish(result); //唤醒主线程的阻塞,执行后续的逻辑
}
}
- 西装具体的凭据模板代码翻译如下:
package com.example.liuxiaobing.statemodel.future_model;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Create By 刘胡来
* Create Date 2020/4/23
* Des:
*/
public class FutureTask<T> implements Future<T> {
private T result;
private boolean isDone = false;
public boolean isTimeout = false;
public boolean isCancel = false;
public long duration;
private final Object LOCK = new Object();
public FutureTask(){
this.isTimeout = false;
this.isCancel = false;
this.duration = 0;
}
/**
* 当任务没有完成时使用wait进行阻塞
* @return
* @throws InterruptedException
*/
@Override
public T get() throws InterruptedException {
synchronized (LOCK){
while(!isDone){
LOCK.wait();
}
return result;
}
}
@Override
public T get(long timeout) throws InterruptedException, ExecutionException, TimeoutException {
synchronized (LOCK){
System.out.println("43---------thread name:"+Thread.currentThread().getName());
if(timeout < 0){
throw new NullPointerException();
}
while(!isDone){
System.out.println("43---------thread name:"+Thread.currentThread().getName() + " 子线程任务还未完成 等待任务线程完成任务 所以 需要处于阻塞状态");
LOCK.wait();
}
if(isCancel){
System.out.println("78---------thread name:"+Thread.currentThread().getName() + " 收到任务取消通知,抛出任务中断异常");
throw new InterruptedException();
}
if(duration > timeout){
System.out.println("58---------thread name:"+Thread.currentThread().getName() + " 收到超时通知抛出超时异常");
throw new TimeoutException();
}
System.out.println("58---------thread name:"+Thread.currentThread().getName() + " 收到任务线程返回的结果,直接将结果返回");
return result;
}
}
@Override
public T get(TimeUnit unit, long timeout) throws InterruptedException, ExecutionException, TimeoutException {
synchronized (LOCK){
if(timeout < 0){
throw new IllegalArgumentException("the time is invalid");
}
//将时间转换为纳秒
long remainingNanos = unit.toNanos(timeout);
//等待任务将在endNanos 纳秒后 超时
final long endNanos = System.nanoTime() + remainingNanos;
while(!isDone){
System.out.println("89---------thread name:"+Thread.currentThread().getName() + " 任务线程任务还未完成 等待任务线程完成任务 所以 需要处于阻塞状态");
//超时 直接抛出异常
if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0){
System.out.println("92---------thread name:"+Thread.currentThread().getName() + " 出现超时,直接抛出超时异常");
throw new TimeoutException("time out");
}
//等待remainingNanos 在等待的过程中可能会被中断,需要重新计算remainingNanos时间
LOCK.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
//执行线程中断时 重新计算时间
remainingNanos = endNanos - System.nanoTime();
}
if(isCancel){
System.out.println("106---------thread name:"+Thread.currentThread().getName() + " 收到任务取消通知,抛出任务中断异常");
throw new InterruptedException();
}
System.out.println("109---------thread name:"+Thread.currentThread().getName() + " 收到任务线程返回的结果,直接将结果返回");
return result;
}
}
/**
* finish 方法主要用于为FutureTask 设置结果
* @param result
*/
protected void finish(T result){
synchronized (LOCK){
if(isDone || isCancel) return;
this.result = result;
this.isDone = true;
LOCK.notifyAll();
}
}
/**
* 收到取消通知立即唤阻塞的任务,并将取消标志设为true
* @param cancel
*/
@Override
public void cancel(boolean cancel){
synchronized (LOCK){
if(cancel){
this.isDone = true;
this.isCancel = true;
LOCK.notifyAll();
}
}
}
@Override
public boolean isCanceled() {
return isCancel;
}
@Override
public boolean done() {
return isDone;
}
}
- 代工的任务类型如下:
package com.example.liuxiaobing.statemodel.future_model;
/**
* Create By 刘胡来
* Create Date 2020/4/23
* Des: 主要是提供给调用者实现计算逻辑,接收一个参数并返回一个结果
*/
public interface Task<IN,OUT> {
/**
* 给定参数计算返回结果
* @param input
* @return
*/
OUT get(IN input);
}
- 代工的模版如下:
package com.example.liuxiaobing.statemodel.future_model;
import android.os.Build;
import android.support.annotation.RequiresApi;
import java.util.concurrent.TimeUnit;
/**
* Create By 刘胡来
* Create Date 2020/4/23
* Sensetime@Copyright
* Des: 用于提交任务
*/
public interface FutureService<IN,OUT> {
/**
* 提交不需要返回值的任务,Future.get方法返回的将会是null
* @param runnable
* @return
*/
Future<?> submit(Runnable runnable);
/**
* 提交需要返回的任务,其中Task接口代替了Runnable 接口
* @param task
* @param input
* @return
*/
Future<OUT> submit(Task<IN,OUT>task,IN input);
/**
* 提交需要返回的任务,其中Task接口代替了Runnable 接口,支持设置超时抛出超时异常
* @param task
* @param input
* @param timeOut
* @return
*/
//Future<OUT> submit(Task<IN,OUT>task, IN input, long timeOut);
/**
* 使用静态方法创建一个FutureService实现
* @param <T>
* @param <R>
* @return
*/
@RequiresApi(api = Build.VERSION_CODES.N)
static <T,R> FutureServiceImpl<T,R> newService(){
return new FutureServiceImpl<>();
}
}
- 具体代工模版如下:
package com.example.liuxiaobing.statemodel.future_model;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Thread.interrupted;
/**
* Create By 刘胡来
* Create Date 2020/4/23
* Des: task任务执行管理类
*/
public class FutureServiceImpl<IN,OUT> implements FutureService<IN,OUT> {
private TaskThread taskThread;
public void cancel(boolean cancel){
if(taskThread != null){
taskThread.cancelTask(cancel);
taskThread.interrupt();
}
}
private final static String FUTURE_THREAD_PREFIX = "FUTURE-";
private final AtomicInteger nextCounter = new AtomicInteger(0);
private String getNextName(){
return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement();
}
@Override
public Future<?> submit(final Runnable runnable) {
final FutureTask<Void> future = new FutureTask<>();
new Thread(new Runnable() {
@Override
public void run() {
runnable.run();
future.finish(null);
}
},getNextName()).start();
return future;
}
@Override
public Future<OUT> submit(final Task<IN, OUT> task, final IN input) {
final FutureTask<OUT> future = new FutureTask<>();
// new Thread(new Runnable() {
// @Override
// public void run() {
// OUT result = task.get(input);
// future.finish(result);
// }
// },getNextName()).start();
if(taskThread == null){
taskThread = new TaskThread(task,input,future);
taskThread.setName("TaskThread");
}
taskThread.start();
return future;
}
private Date startTime = null;
private class TaskThread extends Thread{
public Task<IN, OUT> task;
private IN input;
public FutureTask<OUT> future;
private boolean isCancel;
public TaskThread(Task<IN, OUT> task,final IN input,FutureTask<OUT> future){
this.task = task;
this.input = input;
this.future = future;
}
public void cancelTask(boolean cancel){
this.isCancel = cancel;
this.future.cancel(cancel);
}
@Override
public void run() {
super.run();
System.out.println("89-----------start work:");
startTime = new Date();
OUT result = task.get(input);
Date currentTime = new Date();
future.duration = currentTime.getTime() - startTime.getTime();
future.finish(result); //唤醒主线程的阻塞,执行后续的逻辑
}
}
}
- 测试
private void testFutureModel(){
//测试不带返回值的future
// FutureService<Void,Void> service = FutureService.newService();
// Future<?> future = service.submit(()->{
// try {
// TimeUnit.SECONDS.sleep(10);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("185-------i am finish done");
// });
//
// try {
// future.get();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//测试带返回值的future
// FutureService<String,Integer> serviceResult = FutureService.newService();
// Future<Integer> futureResult = serviceResult.submit(new Task<String, Integer>() {
// @Override
// public Integer get(String input) {
// try {
// TimeUnit.SECONDS.sleep(4);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("206-------i am finish done");
//
// return input.length();
// }
// },"Hello");
//
// try {
// Integer result = futureResult.get(5 * 1000);
// System.out.println("211-------i am finish done:"+result);
// } catch (InterruptedException e) {
// e.printStackTrace();
// System.out.println("244-------InterruptedException:"+e.toString());
// } catch (ExecutionException e) {
// e.printStackTrace();
// } catch (TimeoutException e) {
// e.printStackTrace();
// System.out.println("248-------TimeoutException:"+e.toString());
//
// }
//测试带返回值的future 支持超时设置
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer result = buildFutureTask().get(5 * 1000);
System.out.println("239-------i am get result:"+result);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("259-------InterruptedException:"+e.toString());
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
System.out.println("263-------TimeoutException:"+e.toString());
}
}
}).start();
}
private Future<Integer> buildFutureTask(){
serviceResult = FutureService.newService();
futureResult = serviceResult.submit(new Task<String, Integer>() {
@Override
public Integer get(String input) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("273-------InterruptedException");
}
System.out.println("276-------i am finish done");
return input.length();
}
},"Hello");
return futureResult;
}