ChannelPipline的用法
该文章基于个人的理解,翻译自netty5.0 API。
综述
ChannelPipeline的作用是保存一个ChannelHandler的列表,这些ChannelHandler用于处理或者拦截一个channel的inbound事件和outbound操作。ChannelPipeline实现了一个高级形式的I拦截过滤器模式,给了用户完全的控制权来决定一个event如何被处理,以及在pipeline中的channelHandler之间如何的交互。
创建一个pipeline
每一个channel都有自己的pipeline,并且当一个新的channel被创建时其对应的pipeline也自动被创建。
event在pipeline中如何流动
下面的图描述了I/O event如何被ChannelPipeline中的ChannelHandlers处理的典型情况。一个I/O event被一个ChannelHandler处理,并被这个ChannelHandler向前递交给其相邻的下一个ChannelHandler。如果必要的话,一个ChannelHandler同样可以触发一个任意的I/O event。为了递交或者触发一个event,一个ChannelHandler调用定义在ChannelHandlerContext中的event繁殖方法,比如:ChannelHandlerContext.fireChannelRead(Object)和ChannelHandlerContext.write(Object)。
ChannelPipeline中事件流动示意图一个inbound event被图中左边的ChannelHandlers从下至上地处理。一个inbound event通常被图中底部的I/O线程所触发,以便于在Channel的状态改变的时候(比如:新创建了连接和关闭了连接),或者在inbound数据被远端读取的时候,ChannelHandlers能够被通知到。如果一个inbound event超过了图中的顶部的范围,那么它将根据你的日志等级决定是被抛弃还是被记录入日志。
一个outbound event被图中右边的ChannelHandlers从上至下地处理。一个outbound event通常被你用于请求一个outbound I/O操作的代码所触发,比如一个写请求和一个连接的尝试。如果一个outbound event超过了图中ChannelHandlers底部的范围,它将被与Channel相关的I/O线程所处理。I/O线程通常进行真正的output操作,比如:SocketChannel.write(ByteBuffer)。
将一个event向前递交给下一个相邻的handler
根据上面简单的解释,一个ChannelHandler只能通过唤醒ChannelHandlerContext中event繁殖的方法,来将一个event递交给它的下一个handler。这些方法包括:
- Inbound event propagation methods:
- ChannelHandlerContext.fireChannelRegistered()
- ChannelHandlerContext.fireChannelActive()
- ChannelHandlerContext.fireChannelRead(Object)
- ChannelHandlerContext.fireChannelReadComplete()
- ChannelHandlerContext.fireExceptionCaught(Throwable)
- ChannelHandlerContext.fireUserEventTriggered(Object)
- ChannelHandlerContext.fireChannelWritabilityChanged()
- ChannelHandlerContext.fireChannelInactive()
- Outbound event propagation methods:
- ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
- ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
- ChannelHandlerContext.write(Object, ChannelPromise)
- ChannelHandlerContext.flush()
- ChannelHandlerContext.read()
- ChannelHandlerContext.disconnect(ChannelPromise)
- ChannelHandlerContext.close(ChannelPromise)
下面的例子展示了event繁殖通常怎么做:
public class MyInboundHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
创建一个pipeline
用户可能有一个或多个ChannelHandlers在一个pipeline中来接收I/O events(比如:读)和请求I/O操作(比如:写和关闭)。例如,一个典型的服务端在每一个channel的pipeline中都会有以下的handlers。但是你的handler的个数通常是变化的,这取决于protocol和业务逻辑的复杂性和特点:
- Protocol解码器:将二进制数据(比如:ByteBuf)转成一个Java对象。
- Protocol编码器:将一个Java对象转成二进制数据。
- 业务逻辑相关的Handler:处理真正的业务逻辑(比如:数据库访问)。
这有可能像下面的例子展示的这样:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
线程安全
一个ChannelHandler在任何时候被添加或移除,因为一个ChannelPipeline是线程安全的。例如,你可以在将要交换敏感信息的时候添加一个加密的handler,并在交换完毕之后移除它。