《netty in action》读书笔记 PART1
6. ChannelHandler and ChannelPipeline
6.1 The ChannelHandler family
6.1.1 Channel的生命周期
ChannelUnregistered
已创建,但是还没有被注册到EventLoop上。
ChannelRegistered
已创建,并且已经注册到EventLoop。
ChannelActive
连接上远程主机。
ChannelActive
没有连接到远程主机。
Channel状态的变化会触发相应的事件。
6.1.2 ChannelHandler的生命周期
handlerAdd
添加handler
handlerRemove
删除handler
exceptionCaught
发生异常
ChannelHandler有两个重要的子接口:ChannelInboundHandler和ChannelOutboundHandler。
6.1.3 ChannelInboundHandler接口
接受到数据或者Channel的状态发生改变会调用ChannelInboundHandler中的方法。注意,当ChannelInboundHandler中的channelRead()方法被overwrite,需要对ByteBuf实例持有的资源进行显示释放。
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
可以使用SimpleChannelInboundHandler,它会自动释放资源,无需人工干预:
@Sharable
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special
}
}
6.1.4 ChannelOutboundHandler接口
它一个比较强大的功能是延迟执行。
CHANNELPROMISE VS. CHANNELFUTURE
CHANNELPROMISE是CHANNELFUTURE的子接口,CHANNELFUTURE是不可写的,CHANNELPROMISE是可写的(例如setSuccess(),setFailure()方法)
6.1.5 ChannelHandler adapters
关系图6.1.6 资源管理
要注意ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()要释放相应的资源,否则会产生内存泄漏。netty使用引用计数法来管理内存资源。可以使用netty提供的ResourceLeakDetector来发现潜在的内存泄漏问题。
java -Dio.netty.leakDetectionLevel=ADVANCED
leakDetectionLevel可以为DISABLED、SIMPLE(默认)、ADVANCED和PARANOID。
6.2 ChannelPipeline接口
ChannelPipeline可以看成由ChannelHandler组成的链表,I/O事件会在ChannelPipeline上传播。每个新Channel会绑定一个新ChannelPipeline,两者是一对一关系。
pipeline中的事件传播
事件传播的时候,会判断ChannelHandler的类型(implements Inbound还是OutBound的接口)和事件传播的方向是否一致,不一致跳过。
6.2.1 ChannelPipeline修改
ChannelPipeline中ChannelHandler可以动态地被添加、删除或者替换。
ChannelPipeline中操作ChannelHandler
6.2.2 Firing events
会调用ChannelPipeline中下一个ChannelHandler里的方法。
代码示例:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new MyHandler());
ch.pipeline().addLast(new MyHandler2());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class MyHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("in MyHandler1 , messageReceived invoked");
for(int i = 0;i < 10 ; i++) {
ctx.fireChannelInactive();//调用fireChannelInactive 10次
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("in MyHandler1 , channelInactive invoked");
}
}
class MyHandler2 extends SimpleChannelInboundHandler<String>{
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("in MyHandler2 ,messageReceived invoked");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("in MyHandler2 , channelInactive invoked");
}
}
输出:
控制台输出6.3 ChannelHandlerContext接口
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的联系,无论何时,添加一个ChannelHandler到ChannelPipeline就会创建一个ChannelHandlerContext。ChannelHandlerContext的主要功能是和所在ChannelPipeline的其他ChannelHandler交互。
ChannelHandlerContext有很多方法,大部分方法在Channel和ChannelPipeline里都出现过,但是这里有一个非常大的区别,调用Channel和ChannelPipeline里的方法,会在整个pipeline里传播(从头到尾),而ChannelHandlerContext里同名的方法,是从当前ChannelHandler开始传播。
6.3.1 Using ChannelHandlerContext
概念关系图6.3.2 ChannelHandler和ChannelHandlerContext的高级用法。
- ChannelHandlerContext的pipeline()方法可以获取ChannelPipeline的引用,这样我们可以通过这个引用操作ChannelHandler,实现动态协议。
- 可以把ChannelHandlerContext的引用缓存起来,在ChannelHandler方法外面用,甚至在一个不同的线程里使用。下面提供了一个示例。
- 可以将一个ChannelHandler实例可能会被添加到不同的ChannelPipeline里,但是需要使用@Sharable注解,此外还需注意的是,这个Sharable的ChannelHandler需要是线程安全的。
为什么需要@Sharable的ChannelHandler,一个需求就是通过这个@Sharable来统计多个Channel的数据。
6.4 异常处理
6.4.1 Inbound异常处理
Inbound异常处理由于exception默认会从触发异常的ChannelHandler继续向后流动,所以图中的这种处理逻辑,我们一般放在最后ChannelPipeline的末尾。这样就可以确保,无论是哪个ChannelHandler触发异常,都能够被捕获并处理。如果不对异常做捕获处理操作,netty会打印异常未被捕获的日志。
6.4.2 outbound异常处理
进行outbound操作,要想知道结果(正常完成还是发生异常),需要这样做:
-
每个outbound操作都会返回一个ChannelFuture。添加到ChannelFuture上的监听器会收到成功或者错误通知。
-
ChannelOutboundHandler中的方法绝大多数都会ChannelPromise类型的参数。ChannelPromise也可以添加监听来接受异步通知。ChannelPromise是可写的,可以通过它的setSucess()方法或者setFailure(Throwable cause)立即发布通知。
如果ChannelOutboundHandler自己抛出异常,netty会通知添加到ChannelPromise上的监听器。
7. EventLoop and threading model
7.1 Threading model overview
JDK早期版本多线程编程的方式是create
新线程再start
。JDK5推出了Executor API
,它的线程池技术通过缓存和重用大大提高了性能。
- 有任务(
Runnable实现
)的时候,从线程池里挑选出一个空闲线程,把任务submit
给它。 - 任务执行完毕了,线程变成空闲,回到线程池,等待下一次挑选使用。
线程池不能解决上下文切换开销的问题,上下文的开销在heavy load下会很大。
7.2 EventLoop接口
EventLoop
是一个用来处理事件的任务,基本思想如下图所示:
EventLoop
接口的API分为两类:concurrent和networking。
- concurrent
基于java.util.concurrent
包,提供thread executors - networking
io.netty.channel
继承了EventLoop接口,提供了和Channel事件交互的能力。
7.2.1 Netty 4中I/O事件的处理
7.3.1 JDK 任务调度API
JDK5之前,任务调度只能用java.util.Timer
,Timer就是一个后台线程,有很多限制:
- 如果执行多个定时任务,一个任务发生异常没有捕获,整个Timer线程会挂掉(其他所有任务都会down掉)
- 假如某个任务的执行时间过长,超过一些任务的间隔时间,会导致这些任务执行推迟。
JDK后续推出了java.util.concurrent
,其中定义的ScheduleExecutorService
克服了这些缺陷。
ScheduledExecutorService executor =Executors.newScheduledThreadPool(10);
ScheduledFuture<?> future = executor.schedule(
new Runnable() {
@Override
public void run() {
System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
//to do
executor.shutdown();
尽管ScheduledExecutorSevice
挺好用的,但是在负载大的时候有较大的性能耗费,netty进行了优化。
7.3.2 使用EventLoop进行任务调度
ScheduledExecutorService
也有一些限制,例如会创建额外创建一些线程来管理线程池,这在任务调度非常激烈的情况下,会成为性能的瓶颈。netty没有直接使用ScheduledExecutorService
,使用了继承于ScheduledExecutorService
,自己实现的EventLoop
。
Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().schedule(
new Runnable() {
@Override
public void run() {
System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
重复定时执行:
Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.println("Run every 60 seconds");
}
}, 60, 60, TimeUnit.Seconds);
7.4 实现细节
7.4.1 线程管理
netty线程模型的优越之处是在于它会确定当前执行线程的身份,再进行相应操作。如果当前执行线程被绑定到当前的Channel
和EventLoop
,会被直接执行,否则会被放到EventLoop
的队列里,每个EventLoop
有自己单独的队列。
Never put a long-running task in the execution queue, because it will block any other task from executing on the same thread.” If you must make blocking calls or execute long-running tasks, we advise the use of a dedicated EventExecutor.
7.4.2 EventLoop/Thread分配
EventLoopGroup
包含了EventLoops
和Channels
,EventLoops
创建方式取决于使用哪种I/O.
异步I/O
异步I/O仅仅使用少量的EventLoops
,这些EventLoops
被很多的Channels
共享,这样就可以用最少的线程接受很多的Channels
,而不是一个线程一个Channel
。
阻塞I/O
共同点:每个Channel
的I/O事件只会被一个线程处理。
8. Bootstrapping
bootstrapping an application is the process of configuring it to run
8.1 Bootstrap classes
Namely, a server devotes a parent channel to accepting connections from clients and
creating child channels for conversing with them, whereas a client will most likely
require only a single, non-parent channel for all network interactions. (As we’ll see, this
applies also to connectionless transports such as UDP , because they don’t require a
channel for each connection.)
server需要一个parent channel来接受客户端连接,需要创建多个child channels来应答客户端。
client只需要一个单独的channel,不需要parent channel。
服务端处理使用ServerBootstrap
,客户端使用Bootstrap
。
Why are the bootstrap classes Cloneable?
You’ll sometimes need to create multiple channels that have similar or identical settings. To support this pattern without requiring a new bootstrap instance to be created and configured for each channel, AbstractBootstrap has been marked Cloneable . Calling clone() on an already configured bootstrap will return another bootstrap instance that’s immediately usable. Note that this creates only a shallow copy of the bootstrap’s EventLoopGroup , so the latter will be shared among all of the cloned channels. This is acceptable, as the cloned channels are often short-lived, a typical case being a channel created to make an HTTP request.
8.2 Bootstrapping clients and connectionless protocols
Bootstrap
主要用来给客户端和使用面向无连接的应用创建Channels
。
Bootstraping a client:
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channeRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
} );
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture)throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
8.2.2 Channel和EventLoopGroup的兼容性
you can’t mix components having different
prefixes, such as NioEventLoopGroup and OioSocketChannel . The following listing
shows an attempt to do just that.
Channel
和EventLoopGroup
的前缀要一样。否则会抛出IllegalStateException
8.3 Bootstraping servers
ServerBootstrap类
A ServerBootstrap creating a ServerChannel on bind() , and the ServerChannel managing a number of child Channels.
相比 Bootstrap
类,增加了childHandler()
,childAttr()
,childOption()
方法。ServerChannel
来创建许许多多的子Channel
,代表接受的连接。ServerBootstrap
提供了这些方法来简化对子Channel
的配置。
NioEventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throw Exception {
System.out.println("Received data");
}
} );
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bound attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
8.4 Bootstrapping clients from a Channel
Suppose your server is processing a client request that requires it to act as a client to
a third system. This can happen when an application, such as a proxy server, has to
integrate with an organization’s existing systems, such as web services or databases. In
such cases you’ll need to bootstrap a client Channel from a ServerChannel
作为服务端接受连接,同时又作为客户端,请求远程服务器(类似于proxy),最容易想到的办法是再创建一个客户端的Bootstrap
,但是这样需要另外一个EventLoop
来处理客户端角色的Channel
,发生在服务端Channel
和客户端Channel
之间数据交换引起的上文切换也会带来额外的性能损耗。
最好的办法是创建的客户端Channel
和服务端Channel
,共享同一个EventLoop
:
ServerBootstrap bootstrap = new ServerBootstrap();
//Sets the EventLoopGroups that provide EventLoops for processing Channel events
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
ChannelFuture connectFuture;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//Creates a Bootstrap to connect to remote host
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
System.out.println("Received data");
}
});
//Uses the same EventLoop as the one assigned to the accepted channel
bootstrap.group(ctx.channel().eventLoop());
connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
throws Exception {
if (connectFuture.isDone()) {
// do something with the data
//When the connection is complete performs some data operation (such as proxying)
}
}
});
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
8.5 Adding multiple ChannelHandlers during a bootstrap
在bootstrap
的时候,如何添加多个ChannelHandler
?
netty提供了ChannelInboundHandlerAdapter
的特殊子类ChannelInitializer
:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
ChannelInitializer
提供了initChannel()
可以轻松添加ChannelHandlers
到ChannelPipeline
。
protected abstract void initChannel(C ch) throws Exception;
一旦Channel
注册到EventLoop
,我们实现的initChannel()
就会被调用。当initChannel()
返回的时候,ChannelInitializer
实例会把自己从ChannelPipeline
中删除。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializerImpl());
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.sync();
对应ChannelInitializerImpl
的实现:
final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
}
8.6 Using Netty ChannelOptions and attributes
不需要我们手工配置每个Channel
,netty提供了option()
方法来把ChannelOptions
应用到bootstrap
,ChannelOptions
中的配置会自动地应用到所有Channel
Netty的Channel
和bootstrap
类,提供了AttributeMap
抽象集合和AttributeKey<T>
泛型类,用来insert和retrieve属性值。使用这些工具,我们可以安全地把任意类型的数据和Channel
关联起来。
Attribute
的一个使用场景是,服务端应用需要追踪用户和Channels
的关系。可以把用户的ID作为一个属性存到Channel
里。这样就可以实现根据ID来路由消息和Channel
不活跃自动关闭等功能。
final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Integer idValue = ctx.channel().attr(id).get();
// do something with the idValue
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
throws Exception {
System.out.println("Received data");
}
});
bootstrap.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();
8.7 Bootstrapping DatagramChannels
之前的bootstrap
示例代码都是基于TCP-based的SocketChannel
,bootstrap
也可以配置为无连接协议。
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup()).channel(OioDatagramChannel.class)
.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
// Do something with the packet
}
});
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Channel bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
8.8 Shutdown
Alternatively, you can call Channel.close() explicitly on all active channels before calling EventLoopGroup.shutdownGracefully() . But in all cases, remember to shut down the EventLoopGroup itself.
EventLoopGroup.shutdownGracefully()
,它的返回值是一个future
,这也是一个异步操作。
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
9 Unit testing
Netty提供了embedded transport
来测试ChannelHandlers
,embedded transport
是EmbeddedChannel
(一种特殊的Channel
实现) 的特色功能,可以简单地实现在pipeline
中传播事件。
我们可以写入inbound
或者outbound
数据到EmbeddedChannel
,然后检查是否有东西传输到ChannelPipeline
的末尾。我们还可以确定消息是否被编解码,是否有ChannelHandler
被触发。
Inbound data
会被ChannelInboundHandlers
处理,代表着从远程主机读取的数据。
outbound data
会被ChannelOutboundHandlers
处理,代表将要发送到远程主机的数据。
相关API:
图9.1展示了数据在EmbededChannel
的流动情况。我们可以:
-
使用
writeOutbound()
,写入消息到Channel
,让消息以outbound
方向在pipeline
中传递。后续,我们可以使用readOutbound()
读取处理过后的数据,判断结果是否与预期一致。 -
使用
writeInbound()
,写入消息到Channel
,让消息以inbound
方向在pipeline
中传递。后续,我们可以使用readInbound()
读取处理过后的数据,判断结果是否与预期一致。
9.2 Testing ChannelHandlers with EmbeddedChannel
9.2.1 Testing inbound messages
图9.2 展示了一个简单的ByteToMessageDecoder
实现。如果有足够的数据,这个Decoder会产生固定大小的frame。如果没有足够的数据,没有达到这个固定的size值,它会等待接下来的数据,继续判断能否接着产生frame。
具体代码实现如下:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
while (in.readableBytes() >= frameLength) {
ByteBuf buf = in.readBytes(frameLength);
out.add(buf);
}
}
}
那么如何进行单元测试呢,测试代码如下:
public class FixedLengthFrameDecoderTest {
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
// write bytes
assertTrue(channel.writeInbound(input.retain()));
assertTrue(channel.finish());
// read messages
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
@Test
public void testFramesDecoded2() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
assertFalse(channel.writeInbound(input.readBytes(2)));
assertTrue(channel.writeInbound(input.readBytes(7)));
assertTrue(channel.finish());
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
}
9.2.2 Testing outbound messages
我们需要测试一个编码器:AbsIntegerEncoder
,它是Netty的MessageToMessageEncode
的一个实现,功能是将整数取绝对值。
我们的流程如下:
-
EmbeddedChannel
会将一个四字节负数按照outbound方向写入Channel
。 -
编码器会从到来的
ByteBuf
读取每个负数,调用Math.abs()
获得绝对值。 -
编码器将绝对值写入到
ChannelHandlerPipe
。
编码器代码实现:
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 4) {
int value = Math.abs(in.readInt());
out.add(value);
}
}
}
怎么测试?请看下文:
public class AbsIntegerEncoderTest {
@Test
public void testEncoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 1; i < 10; i++) {
buf.writeInt(i * -1);
}
EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
assertTrue(channel.writeOutbound(buf));
assertTrue(channel.finish());
// read bytes
for (int i = 1; i < 10; i++) {
assertEquals(i, channel.readOutbound());
}
assertNull(channel.readOutbound());
}
}
9.3 Testing exception handling
为了测试异常处理,我们有如下的示例。
为防止资源耗尽,当我们读取到的数据多于某个数值,我们会抛出一个TooLongFrameException
在图9.4中,最大frame的大小为3字节,当一个frame的字节数大于3,它会被忽略,并且会抛出
TooLongFrameException
,其他的pipeline
里的其他ChannelHandlers
要么覆写exceptionCaught()
进行捕获处理,要么会忽略这个异常。
解码器代码:
public class FrameChunkDecoder extends ByteToMessageDecoder {
private final int maxFrameSize;
public FrameChunkDecoder(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if (readableBytes > maxFrameSize) {
// discard the bytes
in.clear();
throw new TooLongFrameException();
}
ByteBuf buf = in.readBytes(readableBytes);
out.add(buf);
}
}
如何测试,请看:
public class FrameChunkDecoderTest {
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
assertTrue(channel.writeInbound(input.readBytes(2)));
try {
channel.writeInbound(input.readBytes(4));
Assert.fail();
} catch (TooLongFrameException e) {
// expected exception
}
assertTrue(channel.writeInbound(input.readBytes(3)));
assertTrue(channel.finish());
// Read frames
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(2), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.skipBytes(4).readSlice(3), read);
read.release();
buf.release();
}
}
10.The codec framework
encoder
,将outbound
消息转换成易于传输的方式(大部分是字节流)。
decoder
,将inbound
网络字节流转回成应用程序消息格式。
10.2 Decoders
两种场景需要使用到Decoders
:
- 将字节流解码成消息--
ByteToMessageDecoder
和ReplayingDecoder
- 将一种消息类型解码成另一种类型--
MessageToMessageDecoder
10.2.1 ByteToMessageDecoder抽象类
功能: 将字节流解码成消息或者另一种字节流。
使用示例ToIntegerDecoder
:
每次从ByteBuf
读取四个字节,解码成int
,添加到List
里。当没有更多的数据添加到List
,List
里的内容会传递到下一个ChannelInboundHandler
。
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
编解码框架里,消息处理完了,会自动调用ReferenceCountUtil.release(message)
,资源会自动释放。
Reference counting in codecs
As we mentioned in chapters 5 and 6, reference counting requires special attention. In the case of encoders and decoders, the procedure is quite simple: once a mes- sage has been encoded or decoded, it will automatically be released by a call toReferenceCountUtil.release(message)
. If you need to keep a reference for later use you can callReferenceCountUtil.retain(message)
. This increments the reference count, preventing the message from being released.
10.2.2 ReplayingDecoder抽象类
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
继承于ByteToMessageDecoder
,特点是我们不再需要调用readableBytes()
,省了判断数据是否足够的逻辑。
注意:
-
不是所有的
ByteBuf
的操作都被支持。如果不支持会抛出UnsupportedOperationException
异常。 -
ReplayingDecoder
会比ByteToMessageDecoder
稍慢。
ToIntegerDecoder2
:
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readInt());
}
}
更多的解码工具可以在io.netty.handler.codec
下找到。
-
io.netty.handler.codec.LineBasedFrameDecoder
,通过换行符(\n
或者\r\n
)来解析消息。 -
io.netty.handler.codec.http.HttpObjectDecoder
,解析HTTP数据。
10.2.3 MessageToMessageDecoder抽象类
消息格式互相转换,如把一种类型的POJO转换成另外一种。
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
API差不多
示例:IntegerToStringDecoder
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
out.add(String.valueOf(msg));
}
}
一个更贴切详细的例子是io.netty.handler.codec.http.HttpObjectAggregator
用 TooLongFrameException
防止资源耗尽:
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
private static final int MAX_FRAME_SIZE = 1024;
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
if (readable > MAX_FRAME_SIZE) {
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
// do something
}
}
10.3 Encoders
与解码器类似,Encoders
分为两种:
- 将消息编码成字节流。
- 将一种消息编码成另一种格式的消息。
10.3.1 MessageToByteEncoder抽象类
示例ShortToByteEncoder
:
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
out.writeShort(msg);
}
}
更具体的应用实践可以参见io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder
10.4 编解码抽象类
既能encode
,又能decode
,二合一。
10.4.1 ByteToMessageCodec抽象类
Any request/response protocol could be a good candidate for using the
ByteToMessageCodec
. For example, in an SMTP implementation, the codec would read incoming bytes and decode them to a custom message type, saySmtpRequest
. On the receiving side, when a response is created, anSmtpResponse
will be produced, which will be encoded back to bytes for transmission.
10.4.2 MessageToMessageCodec抽象类
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
@Override
protected void encode(ChannelHandlerContext ctx,
WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out)
throws Exception {
ByteBuf payload = msg.getData().duplicate().retain();
switch (msg.getType()) {
case BINARY:
out.add(new BinaryWebSocketFrame(payload));
break;
case TEXT:
out.add(new TextWebSocketFrame(payload));
break;
case CLOSE:
out.add(new CloseWebSocketFrame(true, 0, payload));
break;
case CONTINUATION:
out.add(new ContinuationWebSocketFrame(payload));
break;
case PONG:
out.add(new PongWebSocketFrame(payload));
break;
case PING:
out.add(new PingWebSocketFrame(payload));
break;
default:
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg,
List<Object> out) throws Exception {
ByteBuf payload = msg.getData().duplicate().retain();
if (msg instanceof BinaryWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY,
payload));
} else if (msg instanceof CloseWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE,
payload));
} else if (msg instanceof PingWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING,
payload));
} else if (msg instanceof PongWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG,
payload));
} else if (msg instanceof TextWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT,
payload));
} else if (msg instanceof ContinuationWebSocketFrame) {
out.add(new MyWebSocketFrame(
MyWebSocketFrame.FrameType.CONTINUATION, payload));
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
public static final class MyWebSocketFrame {
private final FrameType type;
private final ByteBuf data;
public WebSocketFrame(FrameType type, ByteBuf data) {
this.type = type;
this.data = data;
}
public FrameType getType() {
return type;
}
public ByteBuf getData() {
return data;
}
public enum FrameType {BINARY,
CLOSE,
PING,
PONG,
TEXT,
CONTINUATION;
}
}
}
10.4.3 CombinedChannelDuplexHandler类
将编码器解码器放在一块影响代码的重用性。CombinedChannelDuplexHandler
可以解决这个问题。我们可以使用它而不直接使用codec抽象类。
方法签名:
public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
下面是一个使用范例:
解码器例子ByteToCharDecoder
功能是一次读取2个字节,解码成char
写到List
里
public class ByteToCharDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
while (in.readableBytes() >= 2) {
out.add(in.readChar());
}
}
}
编码器例子CharToByteEncoder
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
@Override
public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out)
throws Exception {
out.writeChar(msg);
}
}
是时候combine
了:
public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedByteCharCodec() {
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}