Future模式

2018-12-14  本文已影响0人  zfylin

Future模式是多线程开发中非常常见的一种设计模式。它的核心思想是异步调用。

对于多线程来说,线程A需要等待线程B的结果,它没必要一直等待B,可以先拿到一个未来的Future,等B有了结果后再取真实的结果, 也就是说Future代表一个异步计算的结果。

普通模式与Future模式的区别

普通模式是阻塞的,客户端请求服务端后需要一直等待服务端返回结果后,才能去执行其他任务。

Future模式是非阻塞的,客户端请求服务端后,服务端只需要创建一个工作线程,然后马上返回了,而这个过程耗时很短,不会阻塞client,client在这期间可以去处理其他任务,当需要的时候向工作进程获取结果。

序列图

例子

/**
 * Future 包装类
 */
public class FutureData<T> {
    private boolean mIsReady = false;
    private T mData;

    public synchronized void setData(T data) {
        mIsReady = true;
        mData = data;

        // 数据准备好,退出等待
        notifyAll();
    }

    public synchronized T getData() {
        while (!mIsReady) {
            try {
                // 数据未准备,好等待
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return mData;
    }
}

/**
 * 服务端
 */
public class Server {
    public FutureData<String> getString() {
        final FutureData<String> data = new FutureData<>();
        // 创建工作线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 结果写回FutureData
                data.setData("world");
            }
        }).start();

        // 返回 FutureData
        return data;
    }
}

客户端代码:

// 创建服务端
Server server = new Server();
// 请求服务端
FutureData<String> futureData = server.getString();

//先执行其他任务
String hello = "hello";
try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
// 获取服务端结果
System.out.print(hello + " " + futureData.getData());

JDK中的Future与FutureTask

Future

Future接口源码:

public interface Future<V> {

    /**
     * 用来取消任务,取消成功则返回true,取消失败则返回false。
     * mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务。
     * 如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false;
     * 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
     * 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true
     */
    boolean isCancelled();

    /**
     * 表示任务是否已经完成,若任务完成,则返回true
     */
    boolean isDone();

    /**
     * 获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future就是对于Runnable或Callable任务的执行进行查询、中断任务、获取结果。

public class FutureTest {

    // 普通模式计算1到1亿的和
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        List<Integer> retList = new ArrayList<>();

        // 计算1000次1至1亿的和
        for (int i = 0; i < 1000; i++) {
            retList.add(Calc.cal(100000000));
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));

        for (int i = 0; i < 1000; i++) {
            try {
                Integer result = retList.get(i);
                System.out.println("第" + i + "个结果: " + result);
            } catch (Exception e) {
            }
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(10000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

--------------------------------------------------
执行结果:
耗时: 43659
第0个结果: 887459712
第1个结果: 887459712
第2个结果: 887459712
...
第999个结果: 887459712
耗时: 43688
public class FutureTest {

    // Future模式计算1到1亿的和
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<Integer>> futureList = new ArrayList<>();

        // 计算1000次1至1亿的和
        for (int i = 0; i < 1000; i++) {
            // 调度执行
            futureList.add(executorService.submit(new Calc()));
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));

        for (int i = 0; i < 1000; i++) {
            try {
                Integer result = futureList.get(i).get();
                System.out.println("第" + i + "个结果: " + result);
            } catch (InterruptedException | ExecutionException e) {
            }
        }
        System.out.println("耗时: " + (System.currentTimeMillis() - start));
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(100000000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

--------------------------------------------------
执行结果:
耗时: 12058
第0个结果: 887459712
第1个结果: 887459712
...
第999个结果: 887459712
耗时: 12405

可以看到,计算1000次1至1亿的和,使用Future模式并发执行最终的耗时比使用传统的方式快了30秒左右,使用Future模式的效率大大提高。

FutureTask

public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

事实上,FutureTask是Future接口的一个唯一实现类。

例子

import java.util.concurrent.*;

/**
 * Callable+Future获取执行结果
 */
public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        System.out.println("主线程在执行任务");

        try {
            System.out.println("task运行结果" + result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("所有任务执行完毕");
    }
}

class Task implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for (int i = 0; i < 100; i++)
            sum += i;
        return sum;
    }
}
import java.util.concurrent.*;

/**
 * Callable+FutureTask获取执行结果
 */
public class FutureTest {

    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<>(task);
        executor.submit(futureTask);
        executor.shutdown();

        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        System.out.println("主线程在执行任务");

        try {
            System.out.println("task运行结果" + futureTask.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("所有任务执行完毕");
    }
}


class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
上一篇下一篇

猜你喜欢

热点阅读