java

Netty实战六:Netty处理同一个端口上来的多条不同协议的数

2018-12-11  本文已影响407人  雪飘千里

在实战三中,我们处理了同一个端口上来的2种不同协议的数据,项目上线后,运行良好,之后项目又需要添加一种数据协议,按照同样的方法处理再上线后,发现在网络很差的情况下,会有数据丢包现象。
为了更加通用,针对项目进行了重构,对于netty处理也增加了不少优化。

优化点:

重构之后,过两天就会上线,现在我们总共支持4种不同的数据协议(四种不同厂家的设备),就算还要继续增加,项目结构上也可以很快处理完成。

1、Demo

1、NettyServer.class

package org.xxx.android.netty.server;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Created by zhangkai on 2018/6/11.
 * NioEventLoopGroup → EpollEventLoopGroup
   NioEventLoop → EpollEventLoop
   NioServerSocketChannel → EpollServerSocketChannel
   NioSocketChannel → EpollSocketChannel
   @Component
 */
public class NettyServer{
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    EventLoopGroup boss =null;
    EventLoopGroup worker =null;
    ChannelFuture future = null;
    //厂商编码
    Integer factoryCode=null;

    boolean epoll=true;
    int port;
    public NettyServer(Integer fc,int port){
        this.factoryCode=fc;
        this.port=port;
    }

    @PreDestroy
    public void stop(){
        if(future!=null){
            future.channel().close().addListener(ChannelFutureListener.CLOSE);
            future.awaitUninterruptibly();
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            future=null;
            logger.info(" 服务关闭 ");
        }
    }
    public void start(){
        logger.info(" nettyServer 正在启动");
        
        if(epoll){
            logger.info(" nettyServer 使用epoll模式");
            boss = new EpollEventLoopGroup();
            worker = new EpollEventLoopGroup();
        }
        else{
            logger.info(" nettyServer 使用nio模式");
            boss = new NioEventLoopGroup();
            worker = new NioEventLoopGroup();
        }
        
        logger.info("netty服务器在["+this.port+"]端口启动监听");
        
        serverBootstrap.group(boss,worker)
            .option(ChannelOption.SO_BACKLOG,1024)
            .option(EpollChannelOption.SO_REUSEPORT, true)
            .handler(new LoggingHandler(LogLevel.INFO))
            .option(ChannelOption.TCP_NODELAY,true)
            .childOption(ChannelOption.SO_KEEPALIVE,true)
            .childHandler(new NettyServerInitializer(this.factoryCode));
        
        if(epoll){
            serverBootstrap.channel(EpollServerSocketChannel.class);
        }else{
            serverBootstrap.channel(NioServerSocketChannel.class);
        }
        
        
        try{
            future = serverBootstrap.bind(this.port).sync();
            if(future.isSuccess()){
                logger.info("nettyServer 完成启动 ");
            }
            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        }catch (Exception e){
            //boss.shutdownGracefully();
            //worker.shutdownGracefully();
            logger.info("nettyServer 启动时发生异常---------------{}",e);
            logger.info(e.getMessage());
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

2、NettyServerInitializer.class

package org.xxx.android.netty.server;
import java.util.concurrent.TimeUnit;
import org.xxx.android.netty.NettyConstants;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * Created by zhangkai on 2018/6/11.
 */
public class NettyServerInitializer extends ChannelInitializer<SocketChannel>{
    Integer factoryCode=null;
    public NettyServerInitializer(Integer fc){
        this.factoryCode=fc;
    }
    
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(
                NettyConstants.SERVER_READ_IDEL_TIME_OUT,
                NettyConstants.SERVER_WRITE_IDEL_TIME_OUT,
                NettyConstants.SERVER_ALL_IDEL_TIME_OUT,
                TimeUnit.SECONDS));
        pipeline.addLast(new AcceptorIdleStateTrigger());

        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new ByteArrayEncoder());

        pipeline.addLast(new NettyServerDecoder(this.factoryCode));
        pipeline.addLast(new NettyServerHandler());
    }
}

3、NettyServerDecoder.class

package org.xxx.android.netty.server;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.socket.SocketChannel;
import org.xxx.android.factory.util.FactoryMap;
import org.xxx.android.factory.util.FactoryUtil;
import org.xxx.android.factory.util.MessageUtil;
import org.xxx.android.factory.vo.FactoryEnum;
import org.xxx.android.netty.delegate.DecoderDelegate;
import org.xxx.android.netty.server.decoder.IDecoder;
import org.xxx.android.util.DataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class NettyServerDecoder extends ByteToMessageDecoder {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    /*
     * 记录设备登录次数
     */
    static volatile Map<Integer,Integer> timesMap=new ConcurrentHashMap<Integer,Integer>();
    /*
     * 解码器委托模式
     */
    DecoderDelegate decoderDelegate=null;
    
    Integer factoryCode=null;
    public NettyServerDecoder(Integer fc){
        this.factoryCode=fc;
        this.decoderDelegate=new DecoderDelegate();
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
        try {
            in.retain();
            Channel channel=channelHandlerContext.channel();
            int hashCode=channel.hashCode();
            
            ByteBufToBytes reader = new ByteBufToBytes();
            byte[] byteData = reader.read(in);
            log.info("服务端接收到的原始消息为{}={}",hashCode,DataUtil.ByteArrToHexString(byteData));
            //根据通道获取厂商
            FactoryEnum channelFactory=null;
            if(this.factoryCode==null){
                //AA、BB、CC未指名工厂,从消息中获取工厂
                channelFactory=this.indentifyFromMsg(channel, byteData, in, list);
            }
            else{
                channelFactory=FactoryEnum.codeOf(this.factoryCode);
                FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
            }
            if(channelFactory==null){
                log.info("设备{}消息未识别",hashCode);
                return;
            }
            //获取解码器
            IDecoder decoder=decoderDelegate.getDelegate(channelFactory);
            if(decoder==null){
                log.info("设备{}厂商{}解码器未配置",hashCode,channelFactory.toString());
                return;
            }
            boolean complete = decoder.decoder(hashCode, byteData, in, list);
            if (!complete) {
                log.info("未识别出完整消息,继续接收{}", DataUtil.ByteArrToHexString(byteData));
                return;
            }
            
        }catch (Throwable e){
            log.error("解析出错{}",e);
        }
    }
    /*
     * 从消息中获取工厂
     */
    private FactoryEnum indentifyFromMsg(Channel channel, byte[] byteData, ByteBuf in,
            List<Object> list) {
        int hashCode=channel.hashCode();
        FactoryEnum channelFactory = FactoryUtil.indentifyByChannel(channel);
        if (channelFactory==null) {
            //根据数据识别出厂商
            channelFactory= MessageUtil.getMsgType(byteData);
            if (channelFactory == null) {
                int times=1;
                if(timesMap.containsKey(hashCode)){
                    times=timesMap.get(hashCode)+1;
                }
                if(times==5){
                    log.info("设备{}已登录5次,服务器关闭连接",hashCode);
                    timesMap.remove(hashCode);
                    //关闭通道
                    channel.close();
                    return null;
                }
                else{
                    timesMap.put(hashCode, times);
                }
                //厂商未能识别,继续接收
                in.resetReaderIndex();
                log.info("设备{}厂商未能识别,继续接收{}", hashCode,
                        DataUtil.ByteArrToHexString(byteData));
            }
            else{
                //在decoder中存储socketChannel和协议的对应关系
                FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
            }
        } else {
            timesMap.remove(hashCode);
            log.info("从通道获取厂商成功:{}={}",
                    hashCode,
                    channelFactory.toString());
        }
        return channelFactory;
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.err.println("--------数据读异常----------: ");
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        System.err.println("--------数据读取完毕----------");
    }
    
}

4、NettyServerHandler.class

package org.xxx.android.netty.server;

import org.apache.commons.lang3.StringUtils;
import org.xxx.android.factory.IFactory;
import org.xxx.android.factory.util.FactoryMap;
import org.xxx.android.factory.util.FactoryUtil;
import org.xxx.android.factory.vo.FactoryEnum;
import org.xxx.android.netty.delegate.FactoryDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
/**
 * 多线程共享
 */
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    public final Logger log = LoggerFactory.getLogger(getClass());
    /*
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("----客户端设备连接:{}", ctx);
        ctx.fireChannelActive();
    }
    */
    @Override
    public void channelInactive(ChannelHandlerContext chc) throws Exception {
        SocketChannel socketChannel = (SocketChannel) chc.channel();

        String clientId = FactoryMap.getDevNoByChannel(socketChannel);
        log.info("----客户端设备连接断开:{}", clientId);
        if (!StringUtils.isEmpty(clientId)) {
            FactoryMap.removeChannelByDevNo(clientId);
            FactoryMap.removeChannelDecoder(chc.channel().hashCode());
            FactoryMap.removeChannelFactory(chc.channel().hashCode());
            //客户端断开
            FactoryUtil.getFactoryService().syncNetworkStatus(clientId, 0);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        //System.err.println("--------数据读取完毕----------");
    }
    @Override
    public void channelRead(ChannelHandlerContext chc, Object message) throws Exception {
        try {
            Channel channel=chc.channel();
            //获取协议类型
            Integer channelFactory=FactoryMap.getDecoderByChannel(channel.hashCode());
            if(channelFactory==null){
                log.info("解码器未能维护通道和工厂关系");
                return;
            }
            FactoryEnum factoryEnum=FactoryEnum.codeOf(channelFactory);
            if (factoryEnum == null) {
                log.info("解析消息失败,未识别消息所属厂家");
                return;
            }
            this.factoryMessage(channel,message,factoryEnum);
            
        }catch (Exception e){
            log.error("处理业务消息失败,{}",e);
        }
    }
    void factoryMessage(Channel channel, Object msg,FactoryEnum factoryEnum) {
        //处理消息
        /*
        byte[] data = (byte[])message;
        log.info("{}{}接收到通道{}的原始消息=={}",
                factoryEnum.getTitle(),
                NettyMap.getDevNoByChannel(socketChannel),
                socketChannel.hashCode(),
                DataUtil.bytesToHexString(data));
        */

        IFactory factory=FactoryMap.getFactoryByChannel(channel.hashCode());
        SocketChannel socketChannel = (SocketChannel) channel;
        if(factory==null){
            //委托模式创建工厂
            factory = FactoryDelegate.createFactory(factoryEnum);
            //对接收到的消息进行处理
            factory.processMessage(socketChannel,msg);
            FactoryMap.putChannelFactory(socketChannel.hashCode(), factory);
        }
        else{
            //对接收到的消息进行处理
            factory.processMessage(socketChannel,msg);
        }
        log.info("{}={}",socketChannel.hashCode(),factory.getFactoryDevNo());
    }
}

5、SpringbootApplication.class

package org.xxx.android;

import org.xxx.android.netty.server.NettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.embedded.ConfigurableEmbeddedServletContainer;
import org.springframework.boot.context.embedded.EmbeddedServletContainerCustomizer;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 该注解指定项目为springboot,由此类当作程序入口
* 自动装配 web 依赖的环境
**/
//@Slf4j
@EnableJpaAuditing
@EnableScheduling
@SpringBootApplication
public class SpringbootApplication implements CommandLineRunner,EmbeddedServletContainerCustomizer{
   @Value("${server.port}")
   int serverPort;
   

   @Value("${netty.startup}")
   int startupStartup;
   // 注入NettyServer
   @Autowired NettyServer nettyServer;
   @Autowired NettyServer yyNettyServer;
   
   public static void main(String[] args) {
       SpringApplication.run(SpringbootApplication.class, args);
   }
   @Override
   public void customize(ConfigurableEmbeddedServletContainer container) {
       container.setPort(serverPort);
   }
   @Override
   public void run(String... strings) {
       this.startNettyServer();
    }
    void startNettyServer() {
        if(startupStartup==1){
            this.nettyThreadStart(nettyServer);
            this.nettyThreadStart(yyNettyServer);
            Runtime.getRuntime().addShutdownHook(new Thread(){
                 @Override
                 public void run(){
                     stopNettyServer();
                 }
            });
       }
    }
    void stopNettyServer() {
        nettyServer.stop();
        yyNettyServer.stop();
    }
    void nettyThreadStart(final NettyServer ns) {
        Thread thread = new Thread(new Runnable(){
            @Override
            public void run() {
                ns.start();
            }
        });
        thread.start();
    }
}

2、粘包/拆包解决思路

基本思路就是不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包;
若当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包

若当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接

这里最重要的是就是,使用markReaderIndex标记读索引,使的多余的数据保留,继续等待后面的数据

        //BB数据
        //判断心跳
        if(isXyHeart(byteData)){
            list.add(byteData);
            return true;
        }

        //判断是否是开头
        if(isXYMsgHeader(byteData)){
            headIndexMap.remove(hashCode);
            int length = DataUtil.byteToInt(byteData[XyConstant.BUSINESS_RSP_MSG_FIELD.LEN.INDEX]);
            //整包
            if(length == byteData.length){
                //判断校验和
                if(!isCheckNum(byteData,length)){
                    log.error("兴元数据包校验和不通过{}!={}==={}",byteData[length-1]& FactoryConstant.BYTE_MASK, XyBusinessReqMsgUtil.getCheckSum(byteData)&FactoryConstant.BYTE_MASK,DataUtil.ByteArrayToString(byteData));
                }else {
                    list.add(byteData);
                    return true;
                }
            }
            if(length > byteData.length){
                //半包,继续接收
                in.resetReaderIndex();
                return false;
            }
            if(length < byteData.length){
                log.info("粘包=====接收数据大于帧长度{}>{}",byteData.toString(),length);
                return dealStickyPackage(in, list, length);
            }
    public boolean dealStickyPackage(ByteBuf in, List<Object> list, int length) {
        //粘包,重置读索引
        in.resetReaderIndex();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        //已接收到的完整包数据传给handler去处理
        list.add(bytes);
        //标记读索引,相当于清除当前读索引readIndex之前的数据
        //剩下的数据就是下一条数据的开头,继续等待接收
        in.markReaderIndex();
        return false;
    }
上一篇下一篇

猜你喜欢

热点阅读