【第18篇】Netty对于异步操作与观察者模式
2019-05-30 本文已影响12人
爱学习的蹭蹭
1、Future与ChannelFuture
- Future未来(期望),Netty事件都是异步
- Future的get方法是阻塞,Netty对JDK的Future进行异步的封装,如:判断状态
isSuccess
方法(成功或失败状态)改良点
- ChannelFuture是Future的特化,维护了一个Channel对象
- ChannelPromise(复合管道)是ChannelFuture的子类
- V get() 读取数据
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess(); //当且仅当i/o操作成功完成时,返回true
boolean isCancellable(); //当且仅当可以通过cancel方法取消时,返回true
Throwable cause();//如果i/o操作失败,返回其失败原因。如果成功完成或者还未完成,返回null
//将指定的监听器添加到此Future,future完成时,会通知此监听器,如果添加监听器时future已经完成,则立即通知此监听器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
//移除监听器,如果future已经完成,则不会触发
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
//等待此future done
Future<V> sync() throws InterruptedException;
//等待,不可中断
Future<V> syncUninterruptibly();
//等待Future完成
Future<V> await() throws InterruptedException;
//等待Future完成,不可中断
Future<V> awaitUninterruptibly();
//立即返回,如果此时future未完成,返回null
V getNow();
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
2、DefaultProgressivePromise
- DefaultProgressivePromise类的
awaitUninterruptibly
、setSuccess
、setFailure
方法
@Override//等待唤醒呼叫
public ProgressivePromise<V> awaitUninterruptibly() {
super.awaitUninterruptibly();
return this;
}
@Override//设置监听成功状态
public ProgressivePromise<V> setSuccess(V result) {
super.setSuccess(result);
return this;
}
@Override @Override//设置监听失败状态
public ProgressivePromise<V> setFailure(Throwable cause) {
super.setFailure(cause);
return this;
}
3、DefaultPromise
- DefaultPromise的awaitUninterruptibly方法
@Override
public Promise<V> awaitUninterruptibly() {
if (isDone()) {
return this;
}
//检测死锁问题
checkDeadLock();
boolean interrupted = false;
synchronized (this) {
while (!isDone()) {
incWaiters();//调用等待,自身++
try {
wait();//等待
} catch (InterruptedException e) {
// 打断等待
interrupted = true;
} finally {
//减轻,自身 --
decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
- DefaultPromise的setSuccess方法
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
//成功后通知监听器
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
- DefaultPromise的setFailure方法
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();//通知监听器
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
- DefaultPromise的notifyListeners方法,其实用到了观察者模式
GOF设计模式中有一种叫做观察者模式(Observer),属于行为型模式。又叫发布-订阅(Publish/Subscribe)模式、模型-视图 (Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个 主题对象在状态上发生变化时,会通知所有观察者对象,
private void notifyListeners() {
//事件执行器
EventExecutor executor = executor();
//判断是否在事件循环里面
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();//从InternalThreadLocalMap 线程的值
//异步监听堆栈的值
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
//立即监听通知
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//安全执行监听通知
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
- DefaultPromise的notifyListeners0方法
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
3、JDK的Future
- JDK所提供的Future,只能通过手工方式检查执行结果,而这个操作是阻塞的,Netty对ChannelFuture进行的增强,通过ChannelFutureListerner以回调的方式获取执行结果,去除了手工检查阻塞的操作。
- 值得注意的是ChannelFutureListerner的operatonComplete方法是由I/O线程执行,因此要注意的是 不要在这里执行耗时操作,否则需要通过另外的线程或线程池进行执行
4、JDK的Void
- jdk的Void类实例化对象的关键字的处理
public interface ChannelFuture extends Future<Void>
5、sync 方法
-
在server的绑定端口的sync是同步处理
sync
6、回调函数
- 回调函数当方法、函数执行完成之后进行回调获取
7、Future关系图
-
Future传统处理
Future传统处理 -
Netty的Future处理
Netty的Future对JDK的Future进行改良,加了Listeners监听器进行处理
//添加监听器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
//添加监听器
Future<V> addListeners(GenericFutureListener... var1);
//移除监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
//移除监听器
Future<V> removeListeners(GenericFutureListener... var1);
Netty的Future处理