Netty整合MessagePack、LengthFieldBa

Java语言序列化的目的主要有两个,网络传输和对象持久化;评价一款序列化框架的优劣,主要从以下几个方面考察:
1.对语言的支持;
2.序列化后的码流大小;
3.序列化后的性能;
Java自带的序列化类java.io.Serializable,不能够跨语言,而且序列化后码流太大,性能相对比较低。有一款高效的二进制序列化框架,它就是MessagePack,它像JSON一样支持不同语言之间的数据交换;如果所做项目涉及到多语言,这是个不错的选择。
现在使用Netty框架结合MessagePack编解码框架,来写一个演示demo,由于MessagePack不涉及粘包拆包的问题,还需要结合LengthFieldBasedFrameDecoder,顺便把粘包拆包问题也一起解决了。
首先,需要在pom文件中引入必要的文件:
<!-- netty架包依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- MessagePack架包 -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
第二,继承MessagePack架包的抽象类,重写编解码方法;
import java.util.List;
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
/**
* 基于msgpack的解码器,将ByteBuf解码成另一个POJO对象
* @author 程就人生
* @date 2019年10月13日
*/
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final int length = msg.readableBytes();
final byte[] array = new byte[length];
msg.getBytes(msg.readerIndex(), array, 0, length);
MessagePack msgPack = new MessagePack();
out.add(msgPack.read(array));
}
}
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 基于msgpack的编码器,将POJO编码成ByteBuf
* @author 程就人生
* @date 2019年10月13日
*/
public class MsgPackEncoder extends MessageToByteEncoder<Object>{
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
MessagePack messagePack = new MessagePack();
byte[] raw = messagePack.write(msg);
out.writeBytes(raw);
}
}
第三,服务端代码实现;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* 服务端模拟
* @author 程就人生
* @date 2019年10月13日
*/
public class TestServer2 {
public void bind(int port){
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//采用匿名内部类的方式,声明hanlder
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//使用LengthFieldBasedFrameDecoder解决粘包拆包问题
//基于长度的半包解码器
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
//绑定MessagePack解码
ch.pipeline().addLast(new MsgPackDecoder());
//LengthFieldPrepender负责在带发送的Bytebuf头中增加一个长度来标识消息的长度
//字节数为2,最大长度需要小于65536,256的次方数,最大为8次方
ch.pipeline().addLast(new LengthFieldPrepender(2));
//绑定MessagePack编码
ch.pipeline().addLast(new MsgPackEncoder());
//绑定hanlder
ch.pipeline().addLast(new ServerHandler2());
}
});
//绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestServer2().bind(8080);
}
}
import java.util.Date;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 服务端hanlder
* @author 程就人生
* @date 2019年10月13日
*/
public class ServerHandler2 extends ChannelInboundHandlerAdapter{
private static int counter;
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
JSONObject jsonObject = JSONObject.parseObject(msg.toString());
//把接收到的内容输出
System.out.println("服务端接收:" + msg.toString() + "the count is " + ++counter);
String currentTime = jsonObject.getString("aaa").equals("程就人生,程就人生~!")?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
//返回信息给客户端
ctx.writeAndFlush(currentTime);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
第四,客户端代码实现;
import com.example.netty.server2.MsgPackDecoder;
import com.example.netty.server2.MsgPackEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* 客户端模拟
* @author 程就人生
* @date 2019年10月13日
*/
public class TestClient2 {
public void connect(int port, String host){
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)//设置为非阻塞
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//和服务器端保持一致
//使用LengthFieldBasedFrameDecoder解决粘包拆包问题
//基于长度的半包解码器
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
//绑定MessagePack解码
ch.pipeline().addLast(new MsgPackDecoder());
//LengthFieldPrepender负责在带发送的Bytebuf头中增加一个长度来标识消息的长度
//字节数为2,最大长度需要小于65536,256的次方数,最大为8次方
ch.pipeline().addLast(new LengthFieldPrepender(2));
//绑定MessagePack编码
ch.pipeline().addLast(new MsgPackEncoder());
//绑定hanlder ch.pipeline().addLast(new ClientHandler2());
}
});
//建立连接
ChannelFuture channelFuture = bootstrap.connect(host, port);
//等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally{
group.shutdownGracefully();
}
}
public static void main(String[] argo){
new TestClient2().connect(8080, "localhost");
}
}
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ClientHandler2 extends ChannelInboundHandlerAdapter{
private static int counter;
@Override
public void channelActive(ChannelHandlerContext ctx) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("aaa", "程就人生,程就人生~!");
//连接成功后,发送内容
for(int i = 0;i<100;i++){
ctx.writeAndFlush(jsonObject);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
try{
String body = msg.toString();
System.out.println("当前时间是:" + body + ";the counter is " + ++counter);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}
最后,测试,启动服务端,再启动客户端;


LengthFieldBasedFrameDecoder继承了抽象类ByteToMessageDecoder,一起来看看该类的源码;

在上面的demo中,我们使用了LengthFieldBasedFrameDecoder的第二个构造函数,共有五个参数;
- maxFrameLength:表示消息体的最大长度;
- lengthFieldOffset:表示长度域的偏移量,跳过指定长度个字节之后的才是长度域;
- lengthFieldLength:表示记录数据长度的长度;
- lengthAdjustment:表示该字段加长度字段等于数据帧的长度,包体长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包;
- initialBytesToStrip:从数据帧中跳过的字节数,获取完一个完整的数据包之后,忽略前面的指定的位数个字节,应用解码器拿到的就是不带长度域的数据包;
这些参数即使翻译过了也很费解,还需要在实战中仔细揣摩,根据自己的需要进行调整;另外,从控制台打印的结果可以看出来,打印时把字符串外面的双引号也打印出来了,这说明在转码的过程中,字符串外的引号也被包含在里面了。
如何区分一个整包,通常有四种做法:
1.通过固定长度,不足前面补零;
典型代表类:FixedLengthFrameDecoder,感觉很不好用,或者不会用?
2.通过回车换行符来区分一个整包消息;
典型代表类:LineBasedFrameDecoder 结合 StringDecoder 按行切换的文字解码器;
3.通过分隔符来区分一个整包消息;
典型代表类:DelimiterBasedFrameDecoder 结合 StringDecoder 按分隔符切换的文字解码器;
4.通过指定长度来标识一个整包消息;
LengthFieldBasedFrameDecoder就是按照指定长度来解决半包问题的;MessagePack是一款高效的二进制编解码器,但是本身并不支持粘包/半包的处理,因此还需要结合LengthFieldBasedFrameDecoder类一起使用。
在Netty中粘包拆包,小试牛刀一文中已经实现了2、3这两种,今天又实现了最后一种,你GET到了吗?