Netty笔记之八:自定义通信协议
2017-07-05 本文已影响94人
二月_春风
Netty中双方建立通信之后,对象数据会按照ByteBuf字节码的方式进行传输。
自定义一种通信协议,协议将传输数据定义了消息头和消息正文。
管道中传递LuckMessage对象,LuckMessage中定义了消息头LuckHeader和消息正文content。消息头header包括version,contentLength,sessionId。
消息定义:
// 消息的头部
public class LuckHeader {
// 协议版本
private int version;
// 消息内容长度
private int contentLength;
// 服务名称
private String sessionId;
public LuckHeader(int version, int contentLength, String sessionId) {
this.version = version;
this.contentLength = contentLength;
this.sessionId = sessionId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
}
// 消息的主体
public class LuckMessage {
private LuckHeader luckHeader;
private String content;
public LuckMessage(LuckHeader luckHeader, String content) {
this.luckHeader = luckHeader;
this.content = content;
}
public LuckHeader getLuckHeader() {
return luckHeader;
}
public void setLuckHeader(LuckHeader luckHeader) {
this.luckHeader = luckHeader;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]",
luckHeader.getVersion(),
luckHeader.getContentLength(),
luckHeader.getSessionId(),
content);
}
}
服务端代码:
public class LuckServer {
public static void main(String args[]) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 指定socket的一些属性
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定是一个NIO连接通道
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new LuckServerInitializer());
// 绑定对应的端口号,并启动开始监听端口上的连接
Channel ch = serverBootstrap.bind(8899).sync().channel();
System.out.printf("luck协议启动地址:127.0.0.1:%d/\n", 8899);
// 等待关闭,同步端口
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端初始化连接:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class LuckServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LuckEncoder());
pipeline.addLast(new LuckDecoder());
// 添加逻辑控制层
pipeline.addLast(new LuckServerHandler());
}
}
编码Handler:
package com.zhihao.miao.test.day08;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class LuckEncoder extends MessageToByteEncoder<LuckMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, LuckMessage message, ByteBuf out) throws Exception {
// 将Message转换成二进制数据
LuckHeader header = message.getLuckHeader();
// 这里写入的顺序就是协议的顺序.
// 写入Header信息
out.writeInt(header.getVersion());
out.writeInt(message.getContent().length());
out.writeBytes(header.getSessionId().getBytes());
// 写入消息主体信息
out.writeBytes(message.getContent().getBytes());
}
}
解码器Handler:
package com.zhihao.miao.test.day08;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class LuckDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 获取协议的版本
int version = in.readInt();
// 获取消息长度
int contentLength = in.readInt();
// 获取SessionId
byte[] sessionByte = new byte[36];
in.readBytes(sessionByte);
String sessionId = new String(sessionByte);
// 组装协议头
LuckHeader header = new LuckHeader(version, contentLength, sessionId);
// 读取消息内容,这边demo中不对
byte[] contentbys = new byte[in.readableBytes()];
in.readBytes(contentbys);
String content = new String(contentbys);
LuckMessage message = new LuckMessage(header, content);
out.add(message);
}
}
自定义服务端handler处理器:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class LuckServerHandler extends SimpleChannelInboundHandler<LuckMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LuckMessage msg) throws Exception {
// 简单地打印出server接收到的消息
System.out.println(msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("service exception:"+cause.getMessage());
}
}
客户端:
package com.zhihao.miao.test.day08;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.UUID;
public class LuckClient {
public static void main(String args[]) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new LuckServerInitializer());
// Start the connection attempt.
Channel ch = b.connect("127.0.0.1", 8899).sync().channel();
int version = 1;
String sessionId = UUID.randomUUID().toString();
String content = "I'm the luck protocol!";
LuckHeader header = new LuckHeader(version, content.length(), sessionId);
LuckMessage message = new LuckMessage(header, content);
ch.writeAndFlush(message);
ch.close();
} finally {
group.shutdownGracefully();
}
}
}
客户端初始化连接:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class LuckClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 添加编解码器, 由于ByteToMessageDecoder的子类无法使用@Sharable注解,
// 这里必须给每个Handler都添加一个独立的Decoder.
pipeline.addLast(new LuckEncoder());
pipeline.addLast(new LuckDecoder());
pipeline.addLast(new LuckClientHandler());
}
}
客户端自定义handler:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class LuckClientHandler extends SimpleChannelInboundHandler<LuckMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LuckMessage message) throws Exception {
System.out.println(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception:"+cause.getMessage());
}
}
启动服务器和客户端,服务器端控制台打印:
七月 04, 2017 5:22:27 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x9df966cc, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x99c6480a, L:/127.0.0.1:8899 - R:/127.0.0.1:55722]
七月 04, 2017 5:22:27 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x9df966cc, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
[version=1,contentLength=22,sessionId=c9345f67-99b6-46d2-97ff-eef853c9d569,content=I'm the luck protocol!]