Netty实现服务器与客户端之间数据(POJO)传输
2018-07-30 本文已影响48人
PC_Repair
简述:
使用Netty5实现客户端与服务器之间的通信,采用了JBoss Marshalling外部依赖实现编解码功能,可以在客户端与服务器之间实现简单Java对象的传输。
pom.xml
:使用maven来构建项目,所需添加的依赖如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-river</artifactId>
<version>1.4.10.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.4.11.Final</version>
Server.java
public class Server {
private int port;
public Server(int port) { this.port = port; }
public void start() {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//检测连接有效性(心跳),此处功能:5秒内read()未被调用则触发一次useEventTrigger()方法
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.MINUTES));
//JBossMarshalling外部依赖,进行编解码
ch.pipeline().addLast("encoder", MarshallingCodeCFactory2.buildMarshallingEncoder());
ch.pipeline().addLast("decoder", MarshallingCodeCFactory2.buildMarshallingDecoder());
ch.pipeline().addLast(new ServerHandler()); //ServerHandler实现了业务逻辑
}
})
//服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
.option(ChannelOption.SO_BACKLOG, 128)
//Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。
// 可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
.childOption(ChannelOption.SO_KEEPALIVE, true);
//绑定端口,开始接收进来的连接
ChannelFuture future = sb.bind(port).sync(); //绑定服务器,等待绑定完成,调用sync()的原因是当前线程阻塞
System.out.println("Server start listen at " + port);
future.channel().closeFuture().sync(); //关闭channel和块,直到它被关闭
} catch (Exception e) {
boosGroup.shutdownGracefully(); //关闭EventLoopGroup,释放所有资源(包括所有创建的线程)
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if(args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new Server(port).start();
}
}
Client.java
public class Client {
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//检测连接有效性(心跳),此处功能:5秒内write()未被调用则触发一次useEventTrigger()方法
ch.pipeline().addLast(new IdleStateHandler(0,5,0, TimeUnit.SECONDS));
ch.pipeline().addLast("encoder", MarshallingCodeCFactory2.buildMarshallingEncoder());
ch.pipeline().addLast("decoder", MarshallingCodeCFactory2.buildMarshallingDecoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
String host = "127.0.0.1";
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new Client().connect(port, host);
}
}
ServerHandler.java
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("5秒没有接收到客户端的信息了");
System.out.println("关闭这个不活跃的客户端,回收任务");
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
//每个信息入站都会调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server channelRead..");
//System.out.println(ctx.channel().remoteAddress() + " received message: " + msg.toString());
ClientMessage cmsg = (ClientMessage) msg;
System.out.println("received message: " + cmsg.getClientName() + ", " + cmsg.getClientAddress() + ", " + cmsg.getMessage());
ServerMessage smsg = new ServerMessage("server", ctx.channel().remoteAddress().toString());
ctx.channel().writeAndFlush(smsg);
System.out.println("---------------------------------------------------------------------");
//ctx.channel().writeAndFlush("server " + ctx.channel().localAddress() + " has received the message.");
}
//读操作时捕获到异常时调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ClientHandler.java
public class ClientHandler extends ChannelHandlerAdapter {
//服务器的连接建立后被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("CilentHandler ChannelActive. time: " + new Date());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ClinetHandler ChannelInactive. time: " + new Date());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
Thread.sleep(1000);
ClientMessage cmsg = new ClientMessage("client", ctx.channel().localAddress().toString());
ctx.channel().writeAndFlush(cmsg);
//ctx.channel().writeAndFlush(cmsg.toString());
//ctx.channel().writeAndFlush("userEventTriggered test from client.");
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ServerMessage smsg = (ServerMessage) msg;
System.out.println("Message received: " + smsg.getServerName() + ", " + smsg.getClientAddress());
//System.out.println("Message received: " + message.toString());
}
}
MarshallingCodeCFactory2 .java
public final class MarshallingCodeCFactory2 {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
*
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
// 创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
marshallerFactory, configuration);
// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider,
1024 * 1024 * 1);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
*
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(
marshallerFactory, configuration);
// 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
ClientMessage .java
public class ClientMessage implements Serializable {
private String clientName;
private String clientAddress;
private String message = "Client to Server.";
public ClientMessage(String clientName, String clientAddress) {
this.clientName = clientName;
this.clientAddress = clientAddress;
}
public String getClientName() { return clientName; }
public void setClientName(String clientName) { this.clientName = clientName; }
public String getClientAddress() { return clientAddress; }
public void setClientAddress(String clientAddress) { this.clientAddress = clientAddress; }
public String getMessage() { return message; }
@Override
public String toString() {
return getClientName() + getClientAddress();
}
}
ServerMessage.java
public class ServerMessage implements Serializable {
private String serverName;
private String clientAddress;
private String message = "Server to Client.";
public ServerMessage(String serverName, String clientAddress) {
this.serverName = serverName;
this.clientAddress = clientAddress;
}
public String getServerName() { return serverName; }
public void setServerName(String serverName) { this.serverName = serverName; }
public String getClientAddress() { return clientAddress; }
public void setClientAddress(String clientAddress) { this.clientAddress = clientAddress; }
public String getMessage() { return message; }
@Override
public String toString(){
return "{ " + this.serverName + " " + this.clientAddress + " }";
}
}