netty

SpringBoot 2 整合 Netty 实现基于 DTU 的

2020-08-06  本文已影响0人  程就人生

使用netty不是一天两天了,但是使用netty和DTU通讯还是第一次,接下来要做DTU的通讯协议,对接工作还没有正式开始,只收到一个简单的DTU协议文档,里面的内容大概是下面表格中的样子。

位数 内容 取值
0 字头 0XCC
1 数据长度 低八位
2 数据长度 高八位
3 类型 0:心跳,1:登录
4 机器编码 0X00-0XFF
5 校验位 前面字节相加的低八位

这里把通讯协议简化了一下,仅剩下心跳和登录,如果有其他参数可以在此基础上进行扩展;从表格中可以看出来,每个字段数据位数还不一样,有的一位,有的两位,数据长度占两位,其他各占用一位。
 
只有这么一个文档,只有这么一丁点信息,其他就什么也不知道了,这可如何是好?不知道从哪里下手,这个疑问多多的项目就这样放了一段时间。

但是也不能总是这样放着呀,如果对接的人来了,我这边什么也没有,一下子也建不起一个项目呀?转念又想,曾经使用 **netty + google protobuf ** 开发过IM项目,也有些相似之处。这个DTU可否使用google protobuf呢?

于是,写了一个简单的客户端,一个服务端,来进行收发信息,google protobuf是通过对象编码成二进制进行数据通讯的,但文档中是字节数组,压根没有对象一说呀?写完了demo,测试一遍,但是和字节数组对应不起来,最后还是删掉了。

在网上找了很多资料,找来找去只找到这么两篇Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议
Java 使用Socket 实现基于DTU的TCP服务器 + 数据解析 + 心跳检测可以参考,试着把上面的demo扒拉了好几遍,通过这两篇文章,可以获得一些信息:
第一,可以使用netty和dtu进行通信,选择使用netty框架没有错;
第二,和dtu对接,接收到的是字节数组,不能使用google的protobuf框架,需要另做处理。

把参考文档中带netty的demo也试着在本地拷贝了一份,大概知道了对接收到的字节数组如何处理,但是demo中只有接收,没有发送,这是不够完美的;这也是个问题,单方面的,不好运行呀。

另外在处理字节数组的时候,在流程上还不是太标准,后面有可能会遇到半包、粘包的问题,这些都是需要面对的问题。结合自己曾经开发过IM的经验,把编解码处理和数据处理也分离出来,半包、粘包的问题一并考过进去,这样后面再完善就方便多了。

下面就开始SpringBoot2.1.4 + netty + DTU的客户端编码,难点就在于字节数组的解码和编码,一进一出,搞定了这一步,其他的业务逻辑就好处理了。这里的客户端编码是为了测试DTU的请求,以便和服务端互动起来,可以进行测试。

第一步,pom文件引入netty架包,就这两个架包足够用的了;

<!-- Spring Boot的核心启动器,包含了自动配置、日志和YAML -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- netty架包 -->
        <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
        </dependency>

第二步,对字节数组的编解码,包括半包、粘包处理;
字节数据解码类ByteArrayDecoder:

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
 * byte     1字节    (8位) -27~27-1 0 Byte 255              
 * short    2字节    (16位) -215~215-1 0 Short         
 * int      4字节    (32位) -231~ 231-1 0 Integer           
 * long     8字节    (64位) -263~263-1 0 Long          
 * char     2字节    (C语言中是1字节)可以存储一个汉字
 * float    4字节    (32位) -3.4e+38 ~ 3.4e+38 0.0f Float           
 * double   8字节    (64位) -1.7e+308 ~ 1.7e+308 0 Double
 * char 2字节(16位) u0000~uFFFF(‘’~‘?’) ‘0’ Character (0~216-1(65535))  
 * 布尔 boolean 1/8字节(1位) true, false FALSE Boolean 
 * C语言中,short、int、float、long、double,分别为:1个、2个、4个、8个、16个
 * 对字节数组进行解码
 * @author 程就人生
 * @date 2020年8月3日
 * @Description 
 *
 */
public class ByteArrayDecoder extends ByteToMessageDecoder{

    private static Logger log = LoggerFactory.getLogger(ByteArrayDecoder.class);
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 标记一下当前的readIndex的位置
        in.markReaderIndex();
        //判断获取到的数据是否够字头,不沟通字头继续往下读
        //字头:1位,数据串总长度:2位
        if(in.readableBytes() < ProtoInstant.FILED_LEN){
            log.info("不够包头,继续读!");
            return;
        }
        //读取字头1位
        int fieldHead = CharacterConvert.byteToInt(in.readByte());
        if(fieldHead != ProtoInstant.FIELD_HEAD){
            String error = "字头不对:" + ctx.channel().remoteAddress();
            log.info(error);
            ctx.close();
            return;
        }
        //长度2位,读取传送过来的消息的长度。
        int length = CharacterConvert.shortToInt(in.readShort());
        // 长度如果小于0
        if (length < 0) {// 非法数据,关闭连接
            log.info("数据长度为0,非法数据,关闭连接!");
            ctx.close();
            return;
        }       
        // 读到的消息体长度如果小于传送过来的消息长度,减去字头1位,数据长度2位
        int dataLength = length - ProtoInstant.FILED_LEN;
        if (dataLength > in.readableBytes()) {
            // 重置读取位置
            in.resetReaderIndex();
            return;
        }
        byte[] array;
        if (in.hasArray()) {
            log.info("堆缓冲");
            // 堆缓冲
            ByteBuf slice = in.slice();
            array = slice.array();          
        } else {
            log.info("直接缓冲");
            // 直接缓冲
            array = new byte[dataLength];
            in.readBytes(array, 0, dataLength);
        }
        if(array.length > 0){
            in.retain();
            out.add(array); 
        }
    }
}

字节数组编码类ByteArrayEncoder:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.instant.ProtoInstant;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 对字节数组进行编码
 * @author 程就人生
 * @date 2020年8月3日
 * @Description 
 *
 */
public class ByteArrayEncoder extends MessageToByteEncoder<byte[]>{

    private static Logger log = LoggerFactory.getLogger(ByteArrayEncoder.class);
    
    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
        log.info(".....经过ByteArrayEncoder编码.....");     
        //字头(1位)
        out.writeByte(ProtoInstant.FIELD_HEAD);
        //数据长度(2位),字头1位+数据长度2位+数据位(包含校验1位)
        out.writeShort(ProtoInstant.FILED_LEN + msg.length);
        //消息体,包含我们要发送的数据
        out.writeBytes(msg);
    }

}

在编解码的时候,考虑到DTU那边对接的有可能是C语音,C语言和Java的数据类型不一样,所占用的位数也不一样,这个需要保持一致。

第三步,客户端启动类;
客户端启动类NettyClient,这里在头部加了@Component,只要项目一启动就去建立与服务端的链接,建立链接后登录,登录后保持心跳;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.example.im.codec.ByteArrayDecoder;
import com.example.im.codec.ByteArrayEncoder;
import com.example.im.handler.ExceptionHandler;
import com.example.im.handler.LoginResponseHandler;
import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * netty客户端连接类
 * @author 程就人生
 * @date 2020年8月6日
 * @Description 
 *
 */
@Component
public class NettyClient {
    
    private static Logger log = LoggerFactory.getLogger(NettyClient.class);
    
    // 服务器ip地址
    @Value("${netty.communication.host}")
    private String host;
    
    // 服务器端口
    @Value("${netty.communication.port}")
    private int port;
    
    private Channel channel;
    
    
    @Autowired
    private LoginResponseHandler loginResponseHandler;
    
    @Autowired
    private ExceptionHandler exceptionHandler;
    
    private Bootstrap bootstrap;
    
    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

    @PostConstruct
    public void start() throws Exception {
        //启动客户端
        doConnect();
    }    
    
    /**
     * 连接操作
     */
    private void doConnect() {
        try {
            bootstrap = new Bootstrap();

            bootstrap.group(eventLoopGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            bootstrap.remoteAddress(host, port);

            // 设置通道初始化
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                        public void initChannel(SocketChannel ch) {
                            //编解码处理
                            ch.pipeline().addLast("decoder", new ByteArrayDecoder());
                            ch.pipeline().addLast("encoder", new ByteArrayEncoder());
                            //登录返回处理
                            ch.pipeline().addLast("loginHandler", loginResponseHandler);
                            //异常处理
                            ch.pipeline().addLast("exception", exceptionHandler);
                        }
                    }
            );
            log.info("客户端开始连接");
            ChannelFuture f = bootstrap.connect();
            f.addListener(connectedListener);
        } catch (Exception e) {
            e.printStackTrace();
            log.info("客户端连接失败!" + e.getMessage());
        }
    }
    
    //连接关闭监听
    GenericFutureListener<ChannelFuture> closeListener = (ChannelFuture f) -> {
        log.info(new Date() + ": 连接已经断开……");
        channel = f.channel();
    };

    //连接监听
    GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> {
        final EventLoop eventLoop = f.channel().eventLoop();
        if (!f.isSuccess()) {
            log.info("连接失败!在10s之后准备尝试重连!");
            eventLoop.schedule(() -> doConnect(), 10, TimeUnit.SECONDS);
        } else {
            log.info("服务器 连接成功!" + f.channel().remoteAddress() + ":" + f.channel().localAddress());
            channel = f.channel();
            login();
        }
    };
    
    /**
     * 登录操作
     */
    private void login(){
        //构建登录请求
        ByteBuf buf = Unpooled.buffer(3);
        //登录
        buf.writeByte(ProtoInstant.LOGIN);
        buf.writeByte(ProtoInstant.DEVICE_ID);
        //校验位
        int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD,6,ProtoInstant.LOGIN,ProtoInstant.DEVICE_ID);
        int verify = CharacterConvert.getLow8(sum);
        buf.writeByte(verify);
        writeAndFlush(buf.array());
    }
    
    /**
     * 发送消息
     * @param msg
     */
    public void writeAndFlush(Object msg){
        this.channel.writeAndFlush(msg).addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future)
                    throws Exception {
                // 回调
                if (future.isSuccess()) {
                    log.info("请求netty服务器,消息发送成功!");
                } else {
                    log.info("请求netty服务器,消息发送失败!");
                }
            }
        });
    }
    
    /**
     * 重新建立连接
     * @throws Exception
     */
    public void reconnect() throws Exception {
        if (channel != null && channel.isActive()) {
            return;
        }
        log.info("reconnect....");
        start();
        log.info("reconnect success");
    }

    /**
     * 关闭连接
     */
    public void close() {
        eventLoopGroup.shutdownGracefully();
    }
}

别忘了application.properties文件中的配置:

netty.communication.host=127.0.0.1
netty.communication.port=8500

第四步,handler处理类;
异常处理类ExceptionHandler:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.example.im.NettyClient;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 客户端异常处理handler
 * @author 程就人生
 * @date 2020年8月3日
 * @Description 
 *
 */
@ChannelHandler.Sharable
@Service("exceptionHandler")
public class ExceptionHandler extends ChannelInboundHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(ExceptionHandler.class);
    
    @Autowired
    private NettyClient nettyClient;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof Exception) {
            //捕捉异常信息
            cause.printStackTrace();
            log.error(cause.getMessage());
            ctx.close();
        } else {
            //捕捉异常信息
            cause.printStackTrace();
            log.error(cause.getMessage());
            ctx.close();
        }
        //出现异常时,定时重连;比如上位机服务器重启服务器
        nettyClient.reconnect();
    }

    /**
     * 通道 Read 读取 Complete 完成
     * 做刷新操作 ctx.flush()
     */
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

}

登录处理LoginResponseHandler:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;

/**
 * netty客户端登录处理
 * @author 程就人生
 * @date 2020年8月3日
 * @Description 
 *
 */
@Service("loginResponseHandler")
@ChannelHandler.Sharable
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {
    
    private static Logger log = LoggerFactory.getLogger(LoginResponseHandler.class);

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        if (null == msg || !(msg instanceof byte[])) {
            super.channelRead(ctx, msg);
            return;
        }
        //对接收到的数据进行处理
        byte[] data = (byte[]) msg;
        int dataLength = data.length;
        ByteBuf buf = Unpooled.buffer(dataLength);
        buf.writeBytes(data);
        int type = CharacterConvert.byteToInt(buf.readByte());
        //机器编码
        int deviceId = CharacterConvert.byteToInt(buf.readByte());  
        //校验位
        int verify = CharacterConvert.byteToInt(buf.readByte());
        //如果是登录操作时
        if(type == ProtoInstant.LOGIN){         
            //计算字头 + 数据长度 + 类型 + 参数的总和
            int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);
            if(verify != CharacterConvert.getLow8(sum)){
                log.error("登录返回,校验位错误!");
            }else{
                ChannelPipeline channelPipeline = ctx.pipeline();
                channelPipeline.addAfter("encoder", "heartbeat", new HeartBeatHandler());
                // 移除登录响应处理器
                channelPipeline.remove(this);
                log.info("服务器机登录返回了!");
            }
            return;
        }else{
            super.channelRead(ctx, msg);
            return;
        }
    }
}

心跳处理HeartBeatHandler:

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * netty客户端心跳处理
 * @author 程就人生
 * @date 2020年8月1日
 * @Description 
 *
 */
@ChannelHandler.Sharable
@Service("heartHandler")
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(HeartBeatHandler.class);
    
    // 心跳的时间间隔,单位为s
    private static final int HEARTBEAT_INTERVAL = 100;

    // 在Handler被加入到Pipeline时,开始发送心跳
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.buffer(3);
        //心跳
        buf.writeByte(ProtoInstant.HEART_BEAT);
        //机器编码
        buf.writeByte(ProtoInstant.DEVICE_ID);
        //校验位
        int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, 6, ProtoInstant.HEART_BEAT, ProtoInstant.DEVICE_ID);
        int verify = CharacterConvert.getLow8(sum);
        buf.writeByte(verify);
        // 发送心跳
        heartBeat(ctx, buf.array());
    }

    // 使用定时器,发送心跳报文
    public void heartBeat(ChannelHandlerContext ctx, byte[] heartbeatMsg) {
        ctx.executor().schedule(() -> {
            if (ctx.channel().isActive()) {
                log.info(" 发送心跳 消息 to netty服务器系统");
                ctx.writeAndFlush(heartbeatMsg);
                // 递归调用,发送下一次的心跳
                heartBeat(ctx, heartbeatMsg);
            }
        }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }

    /**
     * 接受到服务器的心跳回写
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 判断消息实例
        if (null == msg || !(msg instanceof byte[])) {
            super.channelRead(ctx, msg);
            return;
        }
        //对接收到的数据进行处理
        byte[] data = (byte[]) msg;
        int dataLength = data.length;
        ByteBuf buf = Unpooled.buffer(dataLength);
        buf.writeBytes(data);
        int type = CharacterConvert.byteToInt(buf.readByte());
        int deviceId = CharacterConvert.byteToInt(buf.readByte());
        //如果是心跳信息时
        if(type == ProtoInstant.HEART_BEAT){
            int verify = CharacterConvert.byteToInt(buf.readByte());
            //计算字头 + 数据长度 + 类型 + 参数的总和
            int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);
            if(verify != CharacterConvert.getLow8(sum)){
                log.error("心跳包,校验位错误!");
            }else{
                log.info("收到回写的心跳 消息 from netty服务器系统");
            }           
            return;
        }else{
            super.channelRead(ctx, msg);
        }
    }
}

客户端的核心编码大抵就是这些,看完这些编码是不是有些期待服务端的编码呢,服务端的编码敬请期待下一篇文章。

上一篇 下一篇

猜你喜欢

热点阅读