Future模式
2018-12-14 本文已影响0人
zfylin
Future模式是多线程开发中非常常见的一种设计模式。它的核心思想是异步调用。
对于多线程来说,线程A需要等待线程B的结果,它没必要一直等待B,可以先拿到一个未来的Future,等B有了结果后再取真实的结果, 也就是说Future代表一个异步计算的结果。
普通模式与Future模式的区别
普通模式是阻塞的,客户端请求服务端后需要一直等待服务端返回结果后,才能去执行其他任务。
Future模式是非阻塞的,客户端请求服务端后,服务端只需要创建一个工作线程,然后马上返回了,而这个过程耗时很短,不会阻塞client,client在这期间可以去处理其他任务,当需要的时候向工作进程获取结果。
序列图例子
/**
* Future 包装类
*/
public class FutureData<T> {
private boolean mIsReady = false;
private T mData;
public synchronized void setData(T data) {
mIsReady = true;
mData = data;
// 数据准备好,退出等待
notifyAll();
}
public synchronized T getData() {
while (!mIsReady) {
try {
// 数据未准备,好等待
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return mData;
}
}
/**
* 服务端
*/
public class Server {
public FutureData<String> getString() {
final FutureData<String> data = new FutureData<>();
// 创建工作线程
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 结果写回FutureData
data.setData("world");
}
}).start();
// 返回 FutureData
return data;
}
}
客户端代码:
// 创建服务端
Server server = new Server();
// 请求服务端
FutureData<String> futureData = server.getString();
//先执行其他任务
String hello = "hello";
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 获取服务端结果
System.out.print(hello + " " + futureData.getData());
JDK中的Future与FutureTask
Future
Future接口源码:
public interface Future<V> {
/**
* 用来取消任务,取消成功则返回true,取消失败则返回false。
* mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务。
* 如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false;
* 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
* 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true
*/
boolean isCancelled();
/**
* 表示任务是否已经完成,若任务完成,则返回true
*/
boolean isDone();
/**
* 获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果
*/
V get() throws InterruptedException, ExecutionException;
/**
* 获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Future就是对于Runnable或Callable任务的执行进行查询、中断任务、获取结果。
public class FutureTest {
// 普通模式计算1到1亿的和
public static void main(String[] args) {
long start = System.currentTimeMillis();
List<Integer> retList = new ArrayList<>();
// 计算1000次1至1亿的和
for (int i = 0; i < 1000; i++) {
retList.add(Calc.cal(100000000));
}
System.out.println("耗时: " + (System.currentTimeMillis() - start));
for (int i = 0; i < 1000; i++) {
try {
Integer result = retList.get(i);
System.out.println("第" + i + "个结果: " + result);
} catch (Exception e) {
}
}
System.out.println("耗时: " + (System.currentTimeMillis() - start));
}
public static class Calc implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return cal(10000);
}
public static int cal (int num) {
int sum = 0;
for (int i = 0; i < num; i++) {
sum += i;
}
return sum;
}
}
}
--------------------------------------------------
执行结果:
耗时: 43659
第0个结果: 887459712
第1个结果: 887459712
第2个结果: 887459712
...
第999个结果: 887459712
耗时: 43688
public class FutureTest {
// Future模式计算1到1亿的和
public static void main(String[] args) {
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<Integer>> futureList = new ArrayList<>();
// 计算1000次1至1亿的和
for (int i = 0; i < 1000; i++) {
// 调度执行
futureList.add(executorService.submit(new Calc()));
}
System.out.println("耗时: " + (System.currentTimeMillis() - start));
for (int i = 0; i < 1000; i++) {
try {
Integer result = futureList.get(i).get();
System.out.println("第" + i + "个结果: " + result);
} catch (InterruptedException | ExecutionException e) {
}
}
System.out.println("耗时: " + (System.currentTimeMillis() - start));
}
public static class Calc implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return cal(100000000);
}
public static int cal (int num) {
int sum = 0;
for (int i = 0; i < num; i++) {
sum += i;
}
return sum;
}
}
}
--------------------------------------------------
执行结果:
耗时: 12058
第0个结果: 887459712
第1个结果: 887459712
...
第999个结果: 887459712
耗时: 12405
可以看到,计算1000次1至1亿的和,使用Future模式并发执行最终的耗时比使用传统的方式快了30秒左右,使用Future模式的效率大大提高。
FutureTask
public class FutureTask<V> implements RunnableFuture<V>
FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
事实上,FutureTask是Future接口的一个唯一实现类。
例子
import java.util.concurrent.*;
/**
* Callable+Future获取执行结果
*/
public class FutureTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果" + result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 100; i++)
sum += i;
return sum;
}
}
import java.util.concurrent.*;
/**
* Callable+FutureTask获取执行结果
*/
public class FutureTest {
public static void main(String[] args) {
//第一种方式
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<>(task);
executor.submit(futureTask);
executor.shutdown();
//第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
/*Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
Thread thread = new Thread(futureTask);
thread.start();*/
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果" + futureTask.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}