netty笔记-protobuf编解码
2018-07-08 本文已影响27人
兴浩
1.protobuf编解码
public class TestSubscribeReqProto {
private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
return req.toByteArray();
}
private static SubscribeReqProto.SubscribeReq decode(byte[] body)
throws InvalidProtocolBufferException {
return SubscribeReqProto.SubscribeReq.parseFrom(body);
}
private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
.newBuilder();
builder.setSubReqID(1);
builder.setUserName("Lilinfeng");
builder.setProductName("Netty Book");
List<String> address = new ArrayList<>();
address.add("NanJing YuHuaTai");
address.add("BeiJing LiuLiChang");
address.add("ShenZhen HongShuLin");
builder.addAllAddress(address);
return builder.build();
}
/**
* @param args
* @throws InvalidProtocolBufferException
*/
public static void main(String[] args)
throws InvalidProtocolBufferException {
SubscribeReqProto.SubscribeReq req = createSubscribeReq();
System.out.println("Before encode : " + req.toString());
SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
System.out.println("After decode : " + req.toString());
System.out.println("Assert equal : --> " + req2.equals(req));
}
}
- newBuilder方法用于创建示例
- toByteArray方法用于序列化(编码)
- parseFrom方法用于反序列化(解码)
2. netty中的protobuf编解码
public class SubReqServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeReqProto.SubscribeReq
.getDefaultInstance()));
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqServer().bind(port);
}
}
- ProtobufEncoder:用于对Probuf类型序列化。
- ProtobufVarint32LengthFieldPrepender:用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
- ProtobufVarint32FrameDecoder:用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
- ProtobufDecoder:反序列化指定的Probuf字节数组为protobuf类型。
下面都看一下
3. ProtobufVarint32LengthFieldPrepender
用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
/**
* BEFORE DECODE (300 bytes) AFTER DECODE (302 bytes)
* +---------------+ +--------+---------------+
* | Protobuf Data |-------------->| Length | Protobuf Data |
* | (300 bytes) | | 0xAC02 | (300 bytes) |
* +---------------+ +--------+---------------+
*
*/
@Sharable
public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
@Override
protected void encode(
ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
int bodyLen = msg.readableBytes();
int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen);
out.ensureWritable(headerLen + bodyLen);
CodedOutputStream headerOut =
CodedOutputStream.newInstance(new ByteBufOutputStream(out));
headerOut.writeRawVarint32(bodyLen);
headerOut.flush();
out.writeBytes(msg, msg.readerIndex(), bodyLen);
}
}
4. ProtobufEncoder
使用toByteArray方法对Protobuf类型序列化。
public class ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> {
@Override
protected void encode(
ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
if (msg instanceof MessageLite) {
out.add(wrappedBuffer(((MessageLite) msg).toByteArray()));
return;
}
if (msg instanceof MessageLite.Builder) {
out.add(wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray()));
}
}
}
5. ProtobufVarint32FrameDecoder
用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
/**
* BEFORE DECODE (302 bytes) AFTER DECODE (300 bytes)
* +--------+---------------+ +---------------+
* | Length | Protobuf Data |----->| Protobuf Data |
* | 0xAC02 | (300 bytes) | | (300 bytes) |
* +--------+---------------+ +---------------+
*/
public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
// TODO maxFrameLength + safe skip + fail-fast option
// (just like LengthFieldBasedFrameDecoder)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
final byte[] buf = new byte[5];
for (int i = 0; i < buf.length; i ++) {
if (!in.isReadable()) {
in.resetReaderIndex();
return;
}
buf[i] = in.readByte();
if (buf[i] >= 0) {
int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
if (length < 0) {
throw new CorruptedFrameException("negative length: " + length);
}
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
} else {
out.add(in.readBytes(length));
return;
}
}
}
// Couldn't find the byte whose MSB is off.
throw new CorruptedFrameException("length wider than 32-bit");
}
}
6.ProtobufDecoder
public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final byte[] array;
final int offset;
final int length = msg.readableBytes();
if (msg.hasArray()) {
array = msg.array();
offset = msg.arrayOffset() + msg.readerIndex();
} else {
array = new byte[length];
msg.getBytes(msg.readerIndex(), array, 0, length);
offset = 0;
}
if (extensionRegistry == null) {
if (HAS_PARSER) {
out.add(prototype.getParserForType().parseFrom(array, offset, length));
} else {
out.add(prototype.newBuilderForType().mergeFrom(array, offset, length).build());
}
} else {
if (HAS_PARSER) {
out.add(prototype.getParserForType().parseFrom(array, offset, length, extensionRegistry));
} else {
out.add(prototype.newBuilderForType().mergeFrom(array, offset, length, extensionRegistry).build());
}
}
}
}