Netty实战六:Netty处理同一个端口上来的多条不同协议的数
2018-12-11 本文已影响407人
雪飘千里
在实战三中,我们处理了同一个端口上来的2种不同协议的数据,项目上线后,运行良好,之后项目又需要添加一种数据协议,按照同样的方法处理再上线后,发现在网络很差的情况下,会有数据丢包现象。
为了更加通用,针对项目进行了重构,对于netty处理也增加了不少优化。
优化点:
- 使用工厂模式,这样的话,就不需要好几个decoder和hander;
- 通过创建多个nettyserver实例(监听不同端口),达到隔离不同数据协议;
- 在decoder中存储socketChannel和协议的对应关系(HashMap),这样在handler中就可以通过channel来获取到协议类型,然后通过协议类型来创建工厂,通过工厂来处理具体数据;
- 粘包/拆包处理,之前的处理方式因为对Netty byteBuf认识不足,所以在处理粘包时可能会对数据;
重构之后,过两天就会上线,现在我们总共支持4种不同的数据协议(四种不同厂家的设备),就算还要继续增加,项目结构上也可以很快处理完成。
1、Demo
1、NettyServer.class
package org.xxx.android.netty.server;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* Created by zhangkai on 2018/6/11.
* NioEventLoopGroup → EpollEventLoopGroup
NioEventLoop → EpollEventLoop
NioServerSocketChannel → EpollServerSocketChannel
NioSocketChannel → EpollSocketChannel
@Component
*/
public class NettyServer{
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss =null;
EventLoopGroup worker =null;
ChannelFuture future = null;
//厂商编码
Integer factoryCode=null;
boolean epoll=true;
int port;
public NettyServer(Integer fc,int port){
this.factoryCode=fc;
this.port=port;
}
@PreDestroy
public void stop(){
if(future!=null){
future.channel().close().addListener(ChannelFutureListener.CLOSE);
future.awaitUninterruptibly();
boss.shutdownGracefully();
worker.shutdownGracefully();
future=null;
logger.info(" 服务关闭 ");
}
}
public void start(){
logger.info(" nettyServer 正在启动");
if(epoll){
logger.info(" nettyServer 使用epoll模式");
boss = new EpollEventLoopGroup();
worker = new EpollEventLoopGroup();
}
else{
logger.info(" nettyServer 使用nio模式");
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
}
logger.info("netty服务器在["+this.port+"]端口启动监听");
serverBootstrap.group(boss,worker)
.option(ChannelOption.SO_BACKLOG,1024)
.option(EpollChannelOption.SO_REUSEPORT, true)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.TCP_NODELAY,true)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new NettyServerInitializer(this.factoryCode));
if(epoll){
serverBootstrap.channel(EpollServerSocketChannel.class);
}else{
serverBootstrap.channel(NioServerSocketChannel.class);
}
try{
future = serverBootstrap.bind(this.port).sync();
if(future.isSuccess()){
logger.info("nettyServer 完成启动 ");
}
// 等待服务端监听端口关闭
future.channel().closeFuture().sync();
}catch (Exception e){
//boss.shutdownGracefully();
//worker.shutdownGracefully();
logger.info("nettyServer 启动时发生异常---------------{}",e);
logger.info(e.getMessage());
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2、NettyServerInitializer.class
package org.xxx.android.netty.server;
import java.util.concurrent.TimeUnit;
import org.xxx.android.netty.NettyConstants;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
/**
* Created by zhangkai on 2018/6/11.
*/
public class NettyServerInitializer extends ChannelInitializer<SocketChannel>{
Integer factoryCode=null;
public NettyServerInitializer(Integer fc){
this.factoryCode=fc;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(
NettyConstants.SERVER_READ_IDEL_TIME_OUT,
NettyConstants.SERVER_WRITE_IDEL_TIME_OUT,
NettyConstants.SERVER_ALL_IDEL_TIME_OUT,
TimeUnit.SECONDS));
pipeline.addLast(new AcceptorIdleStateTrigger());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ByteArrayEncoder());
pipeline.addLast(new NettyServerDecoder(this.factoryCode));
pipeline.addLast(new NettyServerHandler());
}
}
3、NettyServerDecoder.class
package org.xxx.android.netty.server;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.socket.SocketChannel;
import org.xxx.android.factory.util.FactoryMap;
import org.xxx.android.factory.util.FactoryUtil;
import org.xxx.android.factory.util.MessageUtil;
import org.xxx.android.factory.vo.FactoryEnum;
import org.xxx.android.netty.delegate.DecoderDelegate;
import org.xxx.android.netty.server.decoder.IDecoder;
import org.xxx.android.util.DataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class NettyServerDecoder extends ByteToMessageDecoder {
protected final Logger log = LoggerFactory.getLogger(getClass());
/*
* 记录设备登录次数
*/
static volatile Map<Integer,Integer> timesMap=new ConcurrentHashMap<Integer,Integer>();
/*
* 解码器委托模式
*/
DecoderDelegate decoderDelegate=null;
Integer factoryCode=null;
public NettyServerDecoder(Integer fc){
this.factoryCode=fc;
this.decoderDelegate=new DecoderDelegate();
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
try {
in.retain();
Channel channel=channelHandlerContext.channel();
int hashCode=channel.hashCode();
ByteBufToBytes reader = new ByteBufToBytes();
byte[] byteData = reader.read(in);
log.info("服务端接收到的原始消息为{}={}",hashCode,DataUtil.ByteArrToHexString(byteData));
//根据通道获取厂商
FactoryEnum channelFactory=null;
if(this.factoryCode==null){
//AA、BB、CC未指名工厂,从消息中获取工厂
channelFactory=this.indentifyFromMsg(channel, byteData, in, list);
}
else{
channelFactory=FactoryEnum.codeOf(this.factoryCode);
FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
}
if(channelFactory==null){
log.info("设备{}消息未识别",hashCode);
return;
}
//获取解码器
IDecoder decoder=decoderDelegate.getDelegate(channelFactory);
if(decoder==null){
log.info("设备{}厂商{}解码器未配置",hashCode,channelFactory.toString());
return;
}
boolean complete = decoder.decoder(hashCode, byteData, in, list);
if (!complete) {
log.info("未识别出完整消息,继续接收{}", DataUtil.ByteArrToHexString(byteData));
return;
}
}catch (Throwable e){
log.error("解析出错{}",e);
}
}
/*
* 从消息中获取工厂
*/
private FactoryEnum indentifyFromMsg(Channel channel, byte[] byteData, ByteBuf in,
List<Object> list) {
int hashCode=channel.hashCode();
FactoryEnum channelFactory = FactoryUtil.indentifyByChannel(channel);
if (channelFactory==null) {
//根据数据识别出厂商
channelFactory= MessageUtil.getMsgType(byteData);
if (channelFactory == null) {
int times=1;
if(timesMap.containsKey(hashCode)){
times=timesMap.get(hashCode)+1;
}
if(times==5){
log.info("设备{}已登录5次,服务器关闭连接",hashCode);
timesMap.remove(hashCode);
//关闭通道
channel.close();
return null;
}
else{
timesMap.put(hashCode, times);
}
//厂商未能识别,继续接收
in.resetReaderIndex();
log.info("设备{}厂商未能识别,继续接收{}", hashCode,
DataUtil.ByteArrToHexString(byteData));
}
else{
//在decoder中存储socketChannel和协议的对应关系
FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
}
} else {
timesMap.remove(hashCode);
log.info("从通道获取厂商成功:{}={}",
hashCode,
channelFactory.toString());
}
return channelFactory;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.err.println("--------数据读异常----------: ");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
System.err.println("--------数据读取完毕----------");
}
}
4、NettyServerHandler.class
package org.xxx.android.netty.server;
import org.apache.commons.lang3.StringUtils;
import org.xxx.android.factory.IFactory;
import org.xxx.android.factory.util.FactoryMap;
import org.xxx.android.factory.util.FactoryUtil;
import org.xxx.android.factory.vo.FactoryEnum;
import org.xxx.android.netty.delegate.FactoryDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
/**
* 多线程共享
*/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public final Logger log = LoggerFactory.getLogger(getClass());
/*
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("----客户端设备连接:{}", ctx);
ctx.fireChannelActive();
}
*/
@Override
public void channelInactive(ChannelHandlerContext chc) throws Exception {
SocketChannel socketChannel = (SocketChannel) chc.channel();
String clientId = FactoryMap.getDevNoByChannel(socketChannel);
log.info("----客户端设备连接断开:{}", clientId);
if (!StringUtils.isEmpty(clientId)) {
FactoryMap.removeChannelByDevNo(clientId);
FactoryMap.removeChannelDecoder(chc.channel().hashCode());
FactoryMap.removeChannelFactory(chc.channel().hashCode());
//客户端断开
FactoryUtil.getFactoryService().syncNetworkStatus(clientId, 0);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
//System.err.println("--------数据读取完毕----------");
}
@Override
public void channelRead(ChannelHandlerContext chc, Object message) throws Exception {
try {
Channel channel=chc.channel();
//获取协议类型
Integer channelFactory=FactoryMap.getDecoderByChannel(channel.hashCode());
if(channelFactory==null){
log.info("解码器未能维护通道和工厂关系");
return;
}
FactoryEnum factoryEnum=FactoryEnum.codeOf(channelFactory);
if (factoryEnum == null) {
log.info("解析消息失败,未识别消息所属厂家");
return;
}
this.factoryMessage(channel,message,factoryEnum);
}catch (Exception e){
log.error("处理业务消息失败,{}",e);
}
}
void factoryMessage(Channel channel, Object msg,FactoryEnum factoryEnum) {
//处理消息
/*
byte[] data = (byte[])message;
log.info("{}{}接收到通道{}的原始消息=={}",
factoryEnum.getTitle(),
NettyMap.getDevNoByChannel(socketChannel),
socketChannel.hashCode(),
DataUtil.bytesToHexString(data));
*/
IFactory factory=FactoryMap.getFactoryByChannel(channel.hashCode());
SocketChannel socketChannel = (SocketChannel) channel;
if(factory==null){
//委托模式创建工厂
factory = FactoryDelegate.createFactory(factoryEnum);
//对接收到的消息进行处理
factory.processMessage(socketChannel,msg);
FactoryMap.putChannelFactory(socketChannel.hashCode(), factory);
}
else{
//对接收到的消息进行处理
factory.processMessage(socketChannel,msg);
}
log.info("{}={}",socketChannel.hashCode(),factory.getFactoryDevNo());
}
}
5、SpringbootApplication.class
package org.xxx.android;
import org.xxx.android.netty.server.NettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.embedded.ConfigurableEmbeddedServletContainer;
import org.springframework.boot.context.embedded.EmbeddedServletContainerCustomizer;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 该注解指定项目为springboot,由此类当作程序入口
* 自动装配 web 依赖的环境
**/
//@Slf4j
@EnableJpaAuditing
@EnableScheduling
@SpringBootApplication
public class SpringbootApplication implements CommandLineRunner,EmbeddedServletContainerCustomizer{
@Value("${server.port}")
int serverPort;
@Value("${netty.startup}")
int startupStartup;
// 注入NettyServer
@Autowired NettyServer nettyServer;
@Autowired NettyServer yyNettyServer;
public static void main(String[] args) {
SpringApplication.run(SpringbootApplication.class, args);
}
@Override
public void customize(ConfigurableEmbeddedServletContainer container) {
container.setPort(serverPort);
}
@Override
public void run(String... strings) {
this.startNettyServer();
}
void startNettyServer() {
if(startupStartup==1){
this.nettyThreadStart(nettyServer);
this.nettyThreadStart(yyNettyServer);
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
stopNettyServer();
}
});
}
}
void stopNettyServer() {
nettyServer.stop();
yyNettyServer.stop();
}
void nettyThreadStart(final NettyServer ns) {
Thread thread = new Thread(new Runnable(){
@Override
public void run() {
ns.start();
}
});
thread.start();
}
}
2、粘包/拆包解决思路
基本思路就是不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包;
若当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
- 定长——FixedLengthFrameDecoder
- 分隔符——DelimiterBasedFrameDecoder
- 基于长度的变长包——LengthFieldBasedFrameDecoder
若当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接
这里最重要的是就是,使用markReaderIndex标记读索引,使的多余的数据保留,继续等待后面的数据
//BB数据
//判断心跳
if(isXyHeart(byteData)){
list.add(byteData);
return true;
}
//判断是否是开头
if(isXYMsgHeader(byteData)){
headIndexMap.remove(hashCode);
int length = DataUtil.byteToInt(byteData[XyConstant.BUSINESS_RSP_MSG_FIELD.LEN.INDEX]);
//整包
if(length == byteData.length){
//判断校验和
if(!isCheckNum(byteData,length)){
log.error("兴元数据包校验和不通过{}!={}==={}",byteData[length-1]& FactoryConstant.BYTE_MASK, XyBusinessReqMsgUtil.getCheckSum(byteData)&FactoryConstant.BYTE_MASK,DataUtil.ByteArrayToString(byteData));
}else {
list.add(byteData);
return true;
}
}
if(length > byteData.length){
//半包,继续接收
in.resetReaderIndex();
return false;
}
if(length < byteData.length){
log.info("粘包=====接收数据大于帧长度{}>{}",byteData.toString(),length);
return dealStickyPackage(in, list, length);
}
public boolean dealStickyPackage(ByteBuf in, List<Object> list, int length) {
//粘包,重置读索引
in.resetReaderIndex();
byte[] bytes = new byte[length];
in.readBytes(bytes);
//已接收到的完整包数据传给handler去处理
list.add(bytes);
//标记读索引,相当于清除当前读索引readIndex之前的数据
//剩下的数据就是下一条数据的开头,继续等待接收
in.markReaderIndex();
return false;
}