21 JDK序列化
1 ObjectEncoder与ObjectDecoder简介
ObjectEncoder
/ObjectDecoder
使用JDK序列化机制编解码,因此我们可以使用Java对象作为请求和响应参数,限制是对象必须实现Serializable。JDK序列化机制的缺点是:序列化的性能以及序列化后对象占用的字节数比较多。优点是:这是对JDK默认支持的机制,不需要引入第三方依赖。
如果仅仅是对象序列化,字节通过网络传输后,那么在解码时,无法判断到底多少个字节可以构成一个Java对象。因此需要结合长度编码,也就是添加一个Length字段,表示序列化后的字节占用的字节数。因此ObjectEncoder/ObjectDecoder采用的通信协议如下:
+--------+----------+
| Length | Content |
+--------+----------+
其中:
- Length:表示Content字段占用的字节数,Length本身占用的字节数我们可以指定为一个固定的值
- Content:对象经过JDK序列化后二进制字节内容
乍一看,这与我们在上一节讲解的LengthFieldBasedFrameDecoder
/LengthFieldPrepender
采用的通信协议很类似。事实上ObjectDecoder本身就继承了LengthFieldBasedFrameDecoder。不过ObjectEncoder略有不同,其并没有继承LengthFieldPrepender,而是内部直接添加了Length字段。
ObjectEncoder源码如下:
@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
//当需要编码时,encode方法会被回调
//参数msg:就是我们需要序列化的java对象
//参数out:我们需要将序列化后的二进制字节写到ByteBuf中
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
int startIdx = out.writerIndex();
//ByteBufOutputStream是Netty提供的输出流,数据写入其中之后,可以通过其buffer()方法会的对应的ByteBuf实例
ByteBufOutputStream bout = new ByteBufOutputStream(out);
//JDK序列化机制的ObjectOutputStream
ObjectOutputStream oout = null;
try {
//首先占用4个字节,这就是Length字段的字节数,这只是占位符,后面为填充对象序列化后的字节数
bout.write(LENGTH_PLACEHOLDER);
//CompactObjectOutputStream是netty提供的类,其实现了JDK的ObjectOutputStream,顾名思义用于压缩
//同时把bout作为其底层输出流,意味着对象序列化后的字节直接写到了bout中
oout = new CompactObjectOutputStream(bout);
//调用writeObject方法,即表示开始序列化
oout.writeObject(msg);
oout.flush();
} finally {
if (oout != null) {
oout.close();
} else {
bout.close();
}
}
int endIdx = out.writerIndex();
//序列化完成,设置占位符的值,也就是对象序列化后的字节数量
out.setInt(startIdx, endIdx - startIdx - 4);
}
}
ObjectDecoder源码如下所示:
//注意ObjectDecoder继承了LengthFieldBasedFrameDecoder
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
private final ClassResolver classResolver;
public ObjectDecoder(ClassResolver classResolver) {
this(1048576, classResolver);
}
//参数maxObjectSize:表示可接受的对象反序列化的最大字节数,默认为1048576 bytes,约等于1M
//参数classResolver:由于需要将二进制字节反序列化为Java对象,需要指定一个ClassResolver来加载这个类的字节码对象
public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
//调用父类LengthFieldBasedFrameDecoder构造方法,关于这几个参数的作用,参见之前章节的分析
super(maxObjectSize, 0, 4, 0, 4);
this.classResolver = classResolver;
}
//当需要解码时,decode方法会被回调
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//首先调用父类的decode方法进行解码,会解析出包含可解析为java对象的完整二进制字节封装到ByteBuf中,同时Length字段的4个字节会被删除
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
//构造JDK ObjectInputStream实例用于解码
ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
try {
//调用readObject方法进行解码,其返回的就是反序列化之后的Java对象
return ois.readObject();
} finally {
ois.close();
}
}
}
下面我们通过实际案例来演示ObjectEncoder与ObjectDecoder的使用。
2 使用案例
首先我们分别编写两个Java对象Request/Response分别表示请求和响应。
public class Request implements Serializable{
private String request;
private Date requestTime;
//setters getters and toString
}
public class Response implements Serializable{
private String response;
private Date responseTime;
//setters getters and toString
}
接着我们编写Client端代码:
public class JdkSerializerClient {
public static void main(String[] args) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(new ClassResolver() {
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在于server建立连接后,即发送请求报文
public void channelActive(ChannelHandlerContext ctx) {
Request request = new Request();
request.setRequest("i am request!");
request.setRequestTime(new Date());
ctx.writeAndFlush(request);
}
//接受服务端的响应
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Response response= (Response) msg;
System.out.println("receive response:"+response);
}
});
}
});
// Start the client.
ChannelFuture f = b.connect("127.0.0.1", 8080).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
由于client既要发送Java对象Request作为请求,又要接受服务端响应的Response对象,因此在ChannelPipeline中,我们同时添加了ObjectDecoder和ObjectEncoder。
另外我们自定义了一个ChannelInboundHandler,在连接建立时,其channelActive方法会被回调,我们在这个方法中构造一个Request对象发送,通过ObjectEncoder进行编码。服务端会返回一个Response对象,此时channelRead方法会被回调,由于已经经过ObjectDecoder解码,因此可以直接转换为Reponse对象,然后打印。
server端:
public class JdkSerializerServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(new ClassResolver() {
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
// 自定义这个ChannelInboundHandler打印拆包后的结果
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request= (Request) msg;
System.out.println("receive request:"+request);
Response response = new Response();
response.setResponse("response to:"+request.getRequest());
response.setResponseTime(new Date());
ctx.writeAndFlush(response);
}
});
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(8080).sync(); // (7)
System.out.println("JdkSerializerServer Started on 8080...");
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
Server端与Client端分析类似,这里不再赘述。
先后启动Server端与Client端,在Server端控制台我们将看到:
JdkSerializerServer Started on 8080...
receive request:Request{request='i am request!', requestTime=2018-9-9 18:47:30}
在Client端控制台我们将看到:
receive response:Response{response='response to:i am request!', responseTime=2018-9-9 18:48:36}
到此,我们的案例完成。Request和Response都编码、解码成功了。
3 总结
把ObjectEncoder/ObjectDecoder作为序列化/反序列化入门是非常合适的,因为其足够简单。同时通过这个案例,我们也发现了,通信协议中必须包含了一个Length字段,用于表示对象序列化后的字节数。事实上,这种模式也可以套用到我们接下来要介绍的其他序列化框架中。