深入浅出Netty源码剖析Java 杂谈技术干货

【第18篇】Netty对于异步操作与观察者模式

2019-05-30  本文已影响12人  爱学习的蹭蹭

1、Future与ChannelFuture

ChannelPromise复合管道
public interface Future<V> extends java.util.concurrent.Future<V> {
    
    boolean isSuccess();  //当且仅当i/o操作成功完成时,返回true
    boolean isCancellable(); //当且仅当可以通过cancel方法取消时,返回true
    Throwable cause();//如果i/o操作失败,返回其失败原因。如果成功完成或者还未完成,返回null
    //将指定的监听器添加到此Future,future完成时,会通知此监听器,如果添加监听器时future已经完成,则立即通知此监听器
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    //移除监听器,如果future已经完成,则不会触发
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    //等待此future done
   Future<V> sync() throws InterruptedException;
    //等待,不可中断
    Future<V> syncUninterruptibly();
    //等待Future完成
  Future<V> await() throws InterruptedException;
    //等待Future完成,不可中断
  Future<V> awaitUninterruptibly();
  //立即返回,如果此时future未完成,返回null
  V getNow();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

2、DefaultProgressivePromise

   @Override//等待唤醒呼叫
    public ProgressivePromise<V> awaitUninterruptibly() {
        super.awaitUninterruptibly();
        return this;
    }

 @Override//设置监听成功状态
    public ProgressivePromise<V> setSuccess(V result) {
        super.setSuccess(result);
        return this;
    }
@Override @Override//设置监听失败状态
    public ProgressivePromise<V> setFailure(Throwable cause) {
        super.setFailure(cause);
        return this;
    }

3、DefaultPromise

 @Override
    public Promise<V> awaitUninterruptibly() {
        if (isDone()) {
            return this;
        }
        //检测死锁问题
        checkDeadLock();
        boolean interrupted = false;
        synchronized (this) {
            while (!isDone()) {
                incWaiters();//调用等待,自身++
                try {
                    wait();//等待
                } catch (InterruptedException e) {
                    // 打断等待
                    interrupted = true;
                } finally {
                    //减轻,自身 --
                    decWaiters();
                }
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        return this;
    }
 public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            //成功后通知监听器
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
   public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();//通知监听器
            return this;
        }
        throw new IllegalStateException("complete already: " + this, cause);
    }

GOF设计模式中有一种叫做观察者模式(Observer),属于行为型模式。又叫发布-订阅(Publish/Subscribe)模式、模型-视图 (Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个 主题对象在状态上发生变化时,会通知所有观察者对象,

private void notifyListeners() {
       //事件执行器
        EventExecutor executor = executor();
      //判断是否在事件循环里面
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();//从InternalThreadLocalMap 线程的值
        //异步监听堆栈的值
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    //立即监听通知
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }
        //安全执行监听通知
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
  private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }

3、JDK的Future

4、JDK的Void

public interface ChannelFuture extends Future<Void> 

5、sync 方法

6、回调函数

7、Future关系图

//添加监听器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
//添加监听器
Future<V> addListeners(GenericFutureListener... var1);
//移除监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
//移除监听器
Future<V> removeListeners(GenericFutureListener... var1);

Netty的Future处理
上一篇下一篇

猜你喜欢

热点阅读