如何实现异步调用

2019-09-26  本文已影响0人  overflow_e4e4

如何实现异步调用

同步调用是指调用方会被一直阻塞, 直到调用方收到结果。异步是指调用方不回阻塞。传统的socket网络请求就是一个同步调用。那么如何实现一个异步调用?

首先设计自己的异步调用的api
如果只想要进行异步调用,那只需一个线程池或者一个线程不断执行从main线程添加的任务即可,可以写成类似如下代码:

public class ThreadPoolTest2 {
    static List<Event> eventList = Collections.synchronizedList(new ArrayList<>());

    interface Event<V> {
        V doSomething();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        new Thread(() -> {
            while (true) {
                for (int i = 0; i < eventList.size(); i++) {
                    Event event = eventList.get(i);
                    Object result = event.doSomething();
                    eventList.remove(event);
                }
            }
        }, "thread-1").start();
        doSomethingAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 1;
        });
        System.out.println(Thread.currentThread().getName());
    }

    private static <T> void doSomethingAsync(Event<T> event) {
        eventList.add(event);
    }

}

如此一来确实可以异步执行部分代码,然而只是这样处理无法掌握异步任务的执行结果,所以需要doSomethingAsync函数能有一个返回值来获得异步任务的执行结果。其中java提供Future接口完美契合,Future一般用做为异步调用的返回值,他的接口设计如下:
Future是在juc下的一个接口,功能是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。最关键的是通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future配合线程池的用法如下:

ExecutorService executor = Executors.newCachedThreadPool();
//一个线程池,做了一些操作后,返回结果
Future<Integer> result = executor.submit(() -> {
      //do something
      return 1;
});

然而线程池功能很丰富导致源码量也很多且复杂,如果只想要最简单的异步功能,并不需要那么多代码,可以只考虑实现Future的代码(不考虑性能),类似如下:

public class AsyncDemo{
    static List<EventFutureImp> eventList = Collections.synchronizedList(new ArrayList<>());

    interface Event<V> {
        V doSomething();
    }

    interface EventFuture<V> extends Event<V>, Future<V> {

    }

    static class EventFutureImp<V> implements EventFuture<V> {
        Event<V> event;
        CountDownLatch countDownLatch;
        V result;

        public EventFutureImp(Event<V> event) {
            this.event = event;
            countDownLatch = new CountDownLatch(1);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isCancelled() {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isDone() {
            throw new UnsupportedOperationException();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            throw new UnsupportedOperationException();
        }

        //-----以上方法不提供实现,只实现最简单的功能-------------
        @Override
        public V get() throws InterruptedException, ExecutionException {
            countDownLatch.await();
            return result;
        }

        @Override
        public V doSomething() {
            return event.doSomething();
        }

        public void done(V result) {
            this.result = result;
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        new Thread(() -> {
            while (true) {
                for (int i = 0; i < eventList.size(); i++) {
                    EventFutureImp event = eventList.get(i);
                    Object result = event.doSomething();
                    event.done(result);
                    eventList.remove(event);
                }
            }
        }, "thread-1").start();
        Future<Integer> future = doSomethingAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 1;
        });
        System.out.println(Thread.currentThread().getName());
        System.out.println("result :" + future.get());
    }


    private static <T> Future<T> doSomethingAsync(Event<T> event) {
        EventFutureImp<T> eventFuture = new EventFutureImp<>(event);
        eventList.add(eventFuture);
        return eventFuture;
    }

}

逻辑图如下所示:

逻辑图.png

这样设计的一个关键是利用CountDownLatch的特性使得future.get()在未得到结果之前是阻塞的,而得到结果后又马上释放。

网络的异步调用

以上的异步调用其实是单进程内的异步调用,如果要实现一个网络的异步调用,那又比之前复杂了一些。

1. 自定义协议的异步调用

因为服务器的收发行为是可以自定义的,当发送的请求并不是先到达的回包又或者不是每个请求都有回包,这时候主要的问题在于,异步的发送一个网络请求后,并不知道请求的返回应该对应哪个请求。其实也可以通过自定义网络协议设计来解决这个问题:
需要在双方协议中设置requestIdresponseId,由调用方(server)指定requestId并且接受方收到请求后把responseId设置为和requestId一样的值,这样异步请求的调用方也能轻松得到与请求对应的返回包。
具体可以这样做:

 Map<String, Future> map = new ConcurrentHashMap<>();
        EventFutureImp future= new EventFutureImp ();
        map.put(requestId, future);
        //发送
        channel.writeAndFlush(request);
        EventFutureImp future = map.remove(responseId);
        if (future!= null) {
            future .done(response);
        }

关键在于:请求的requestId和回包responseId是一样的,并且不同请求的requestId各自不同。

2. 其他网络协议异步调用

要为现在已有的一些数据库或者服务器实现一个异步调用又该如何做呢。其实很多协议并不会出现发送的请求并不是先到达的回包又或者不是每个请求都有回包这种情况,以http服务器为例,http没有类似requestId这样的字段但对同一个http连接http服务器总是顺序返回请求,那么如果要自己实现一个http异步请求的话,就可以按如下步骤:

    static Queue<Future> queue = new ConcurrentLinkedDeque<>();
        EventFutureImp future= new EventFutureImp ();
        queue.add(future);
        //发送
        ...send()
        EventFutureImp future = queue.poll(responseId);
        if (future!= null) {
            future .done(response);
        }

如此,由于http的顺序返回请求特性,异步请求的结果也不会发生错乱。

总结

异步的调用肯定能提升外部调用的速度,有时候能解决性能上的瓶颈,把资源利用最大化。但也增加了更多的临时对象以及线程切换的开销,同时也比同步编程模型更复杂,难调试。
参考博客: Java并发编程:Callable、Future和FutureTask

上一篇下一篇

猜你喜欢

热点阅读