多线程Future设计模式的两种用法

2020-01-17  本文已影响0人  herohua

多线程Future设计模式

Future接口定义:

public interface Future<T> {

    T get() throws InterruptedException;
}

Future接口的实现类:

public class AsynFuture<T> implements Future<T> {

    private volatile boolean done = false;

    private T result;

    public void done(T result) {
        synchronized (this) {
            this.result = result;   // 更新结果
            done = true;            // 更新业务线程是否运行结束标志
            this.notifyAll();       // 唤醒get方法阻塞的线程
        }
    }

    @Override
    public T get() throws InterruptedException {
        synchronized (this) {
            // 业务线程还没有执行完成,则调用方在此阻塞,直到业务线程执行完毕
            while (!done) {
                wait();
            }
        }
        // 业务线程执行完毕,返回执行结果
        return result;
    }
}

封装任务的FutureTask接口:

public interface FutureTask<T> {

    T call();
}

1. 主动轮询方式

FutureService提供submit方法,用于给客户端调用方传入需要执行的任务FutureTask,执行完任务之后,更新Future对象 ,客户端通过Future的get方法获取业务运行结果,在get方法方法里通过while循环不断轮询业务线程是否执行完毕,如果执行完毕,则将运行结果返回。这种方式客户端调用方的的主线程会阻塞在future.get()方法这里。

public class FutureService {

    public <T> Future<T> submit(final FutureTask<T> task) {

        AsynFuture<T> future = new AsynFuture<>();

        new Thread(() -> {
            // 新建一个线程执行业务逻辑,得到执行结果
            // call方法的实现根据实际业务编写
            T result = task.call();

            // 执行完之后更新future
            future.done(result);
        }).start();

        // 直接返回future对象
        return future;
    }
}

客户端调用方:

public class SyncInvoker {

    public static void main(String[] args) throws InterruptedException {

        FutureService futureService = new FutureService();

        // 客户端通过futureService提交一个任务
        // call方法封装自己的代码逻辑,最后返回最终结果
        Future<String> future = futureService.submit(() -> {

            // 模拟业务运行
            try {
                Thread.sleep(10_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 模拟返回业务结果
            return "FINISH";
        });

        System.out.println("================");
        System.out.println("do other thing");
        System.out.println("================");
        Thread.sleep(1_000);

        // 从future中获取call方法的返回值
        // 主线程在此阻塞
        System.out.println(future.get());
    }
}

2. 被动回调方式

FutureService提供的submit方法,需要传入任务FutureTask和对任务执行结果的处理逻辑,Consumer是一个函数式接口,需要重写accept方法,封装调用方对任务执行结果将会以何种方式进行处理。这样,调用方不再需要显式的通过Future.get()方法去请求任务执行结果,而是将执行结果回调的方式处理。

public class FutureService2 {

    public <T> Future<T> submit(final FutureTask<T> task, final Consumer<T> consumer) {

        AsynFuture<T> future = new AsynFuture<>();

        new Thread(() -> {
            // 新建一个线程执行业务逻辑,得到执行结果
            // call方法的实现根据实际业务编写
            T result = task.call();

            // 执行完之后更新future
            future.done(result);

            // 对执行结果进行处理,具体处理逻辑有调用方实现
            consumer.accept(result);
        }).start();

        // 直接返回future对象
        return future;
    }
}

客户端调用方:

public class SyncInvoker2 {

    public static void main(String[] args) throws InterruptedException {

        FutureService2 futureService2 = new FutureService2();

        // 客户端通过futureService提交一个任务
        // call方法封装自己的代码逻辑,最后返回最终结果
        Future<String> future = futureService2.submit(() -> {

            // 模拟业务运行
            try {
                Thread.sleep(10_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 模拟返回业务结果
            return "FINISH";
        }, System.out::println);    // System.out::println为函数接口Consumer的accept方法的实现,即拿到结果之后直接打印结果,
                                    // 不需要future.get()方法主动轮询结果,导致主线程阻塞

        System.out.println("================");
        System.out.println("do other thing");
        System.out.println("================");
        Thread.sleep(1_000);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读