Netty异步回调模式-Future和Promise剖析

2021-06-30  本文已影响0人  HQG

学习目标

Future简介

我们知道Netty的I/O操作都是异步的,例如bind,connect,write等操作,会返回一个Future接口。Netty源码中大量使用了异步回调处理模式。在做Netty应用开发时,我们也会用到,所以,了解Netty的异步监听,无论是做Netty应用开发还是阅读源码都是十分重要的。

Future接口剖析

image

咱们通过上图的Echo Server的bind(...)方法来分析,返回一个ChannelFuture实例,点ChannelFuture接口,来看下类层次结构图:


future-2.png

可知:ChannelFuture继承了Netty的Future,Netty的Future继承了JDK的Future。

JDK的Future,我想大家应该非常熟悉了,用的最多的就是在线程池ThreadPoolExecutor时,通过submit方法提交任务返回一个Future实例,通过它来查询任务的执行状态和执行结果,最用常用的方法isDone()和get()。

Netty的Future,在继承JDK的Future基础上,扩展了自己方法,我们来看下:

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 任务执行成功,返回true
    boolean isSuccess();
    // 任务被取消,返回true
    boolean isCancellable();
    // 支付执行失败,返回异常
    Throwable cause();
    // 添加Listener,异步非阻塞处理回调结果
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 移除Listener
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 同步阻塞等待任务结束;执行失败话,会将“异常信息”重新抛出来
    Future<V> sync() throws InterruptedException;
    Future<V> syncUninterruptibly();
    // 同步阻塞等待任务结束,和sync方法一样,只不过不会抛出异常信息
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    // 非阻塞,获取执行结果
    V getNow();
    // 取消任务
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

我们知道JDK的Future的任务结果获取需要主动查询,而Netty的Future通过添加监听器Listener,可以做到异步非阻塞处理任务结果,可以称为被动回调

同步阻塞有两种方式:sync()和await(),区别:sync()方法在任务失败后,会把异常信息抛出;await()方法对异常信息不做任何处理,当我们不关心异常信息时可以用await()。通过阅读源码可知sync()方法里面其实调的就是await()方法。推荐使用:sync()方法

// DefaultPromise 类

 @Override
public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}

我们可以看到Future接口没有和IO操作关联在一起,接下来让我们来看看Future的子接口ChannelFuture,它和IO操作中的channel通道关联在一起了,用于异步处理channel事件,这个接口用的最多

public interface ChannelFuture extends Future<Void> {
    // 获取channel通道
    Channel channel();
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();
    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();
    // 标记Futrue是否为Void,如果ChannelFuture是一个void的Future,不允许调// 用addListener(),await(),sync()相关方法
    boolean isVoid();
}

ChannelFuture接口相比父类Future接口,就增加了channel()和isVoid()两个方法,其他方法都是覆盖父类接口,没有别扩展。

到这了,顺便了解下ChannelFuture的状态转换

* <pre>
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+
 * </pre>

ChannelFuture就两种状态Uncompleted(未完成)和Completed(完成),Completed包括三种,执行成功,执行失败和任务取消。注意:执行失败和任务取消都属于Completed

让我们在来看下Future的另一个子接口Promise,它是个可写的Future,到底是写什么东西呢?

public interface Promise<V> extends Future<V> {
    // 执行成功,设置返回值,并通知所有listener,如果已经设置,则会抛出异常
    Promise<V> setSuccess(V result);
    // 设置返回值,如果已经设置,返回false
    boolean trySuccess(V result);
    // 执行失败,设置异常,并通知所有listener
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    // 标记Future不可取消
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();
    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}

接下来,让我们来看看ChannelFuture的可写的子接口ChannelPromise:

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    // 覆盖ChannelFuture接口
    @Override
    Channel channel();
    // 覆盖Promise接口
    @Override
    ChannelPromise setSuccess(Void result);
    ChannelPromise setSuccess();
    boolean trySuccess();
    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise sync() throws InterruptedException;
    @Override
    ChannelPromise syncUninterruptibly();
    @Override
    ChannelPromise await() throws InterruptedException;
    @Override
    ChannelPromise awaitUninterruptibly();
    ChannelPromise unvoid();
}

ChannelPromise接口只是综合了ChannelFuture和Promise接口,没有新增功能

截至到目前,异步监听相关的接口已经介绍完了,让我们通过一张图来概括下:


image

Promise的实现类

看看DefaultPromise的主要属性:

// setSuccess设置result为null时,设置成SUCCESS
private static final Object SUCCESS = new Object();
// 不可取消值
private static final Object UNCANCELLABLE = new Object();
// 取消异常信息持有者
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
        StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
//执行结果,使用volatile修饰,保证可见性
private volatile Object result;
// 当Promise执行完成时需要通知Listener,此时就使用这个executor
private final EventExecutor executor;
// 要通知的listener,因为它可能是不同的类型,所以定义为Object类型,使用时再判断类型
private Object listeners;
// 要使用wait()/notifyAll()机制,这个变量记录了waiter的数量
private short waiters;
// 是否正在通知listener
private boolean notifyingListeners;

从属性中可以看出, DefaultPromise 通过 Object 的 wait/notify 机制实现线程间的同步,通过 volatile 属性保证线程间的可见性。

通过setSuccess(...)方法,来了解下 执行成功后设置返回值并通知Listener的过程:

@Override
public Promise<V> setSuccess(V result) {
    // 第一次成功设置返回值后,返回Promise对象
    if (setSuccess0(result)) {
        return this;
    }
    // 否则抛出异常
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
    // result为null,设置为SUCCESS
    return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
    // CAS设置result值,只有当result为null或者UNCANCELLABLE,才可以执行成功
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        // 唤醒等待的waiter,同时判断是否存在listener
        if (checkNotifyWaiters()) {
            // 通知所有的listener
            notifyListeners();
        }
        return true;
    }
    return false;
}
private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        // 通知所有waiters
        notifyAll();
    }
    // 判断是否添加了监听者listeners
    return listeners != null;
}
private void notifyListeners() {
    EventExecutor executor = executor();
    // 是否是同一个线程
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    // 用自己的executor执行
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

DefaultPromise的方法实现逻辑挺简单,不再全部讲解了,只需知道通过 Object 的 wait/notify 机制实现线程间的同步观察者设计模式进行通知就可以了。

DefaultPromise还有个子类DefaultChannelPromise,这个类在Netty中用的最多的,其内部逻辑调用的都是父类DefaultPromise的方法。DefaultChannelPromise类层次结构图如下:

image

应用实战

public class ChannelFuture_01 {

    public static void main(String[] args) throws InterruptedException {

        EventExecutor executor = new SelfEventExecutor();
        final Promise<String> promise = new DefaultPromise<String>(executor);


        promise.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(Future<? super String> future) throws Exception {
                System.out.println("执行完成结果:" + future.get());
            }
        });


        final Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    promise.setSuccess("hello world!");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
        thread.start();

        promise.sync();
        System.out.println("==================");
    }

    static class SelfEventExecutor extends SingleThreadEventExecutor {


        public SelfEventExecutor() {
            this(null);
        }

        public SelfEventExecutor(EventExecutorGroup parent) {
            this(parent, new DefaultThreadFactory(SelfEventExecutor.class));
        }

        public SelfEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
            super(parent, threadFactory, true);
        }

        @Override
        protected void run() {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
            }
        }
    }
}

Future总结

Netty的Future继承JDK的Future,通过 Object 的 wait/notify机制,实现了线程间的同步;使用观察者设计模式,实现了异步非阻塞回调处理。

上一篇 下一篇

猜你喜欢

热点阅读