Netty集成串口RXTX编程,为什么过时了?

2022-06-18  本文已影响0人  程就人生

用过Netty的人明显表现对它的偏爱,有没有?!

为什么要用netty再实现一遍?

上一篇已经实现了串口通信。当然,简单实现还是远远不够的。即使是串口通信,也需要对收发数据进行编解码处理吧?也需要保证数据的完整性吧?也需要协议吧?也需要把业务逻辑的部分单独处理吧?

下面上代码:

1.编解码类;

import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
 * 对收到的数据进行解码
 * @author 程就人生
 * @Date
 */
public class ByteArrayDecoder extends ByteToMessageDecoder{

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    // 标记一下当前的readIndex的位置
    in.markReaderIndex();
    int dataLength = in.readableBytes();
    byte[] array = new byte[dataLength];
    in.readBytes(array, 0, dataLength);
    if(array.length > 0){
      out.add(array);  
    }
  }
}

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
/**
 * 对发出的数据进行编码
 * @author 程就人生
 * @Date
 */
@Slf4j
public class ByteArrayEncoder extends MessageToByteEncoder<byte[]>{
  
  @Override
  protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
    log.info(".....经过ByteArrayEncoder编码.....");
    //消息体,包含我们要发送的数据
    out.writeBytes(msg);
  }
}

2.数据接收处理类;

import org.springframework.stereotype.Service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.channel.ChannelHandler;

/**
 * 串口接收数据处理器
 * @author 程就人生
 * @Date
 */
@Service("rxtxHandler")
@ChannelHandler.Sharable
public class RxtxHandler extends SimpleChannelInboundHandler<byte[]>{

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
    //文本方式编解码,String
        //System.out.println("接收到:"+msg);
      
      // 十六进制发送编解码
      int dataLength = msg.length;
        ByteBuf buf = Unpooled.buffer(dataLength);
        buf.writeBytes(msg);
        System.out.println("接收到:");
        while(buf.isReadable()){
           System.out.print(" " + buf.readByte());
        }
        System.out.println("");
        // 释放资源
        ReferenceCountUtil.release(msg);
  }
}

3.串口参数配置类;

import io.netty.channel.rxtx.RxtxChannelConfig;
import io.netty.channel.rxtx.RxtxChannelConfig.Databits;
import io.netty.channel.rxtx.RxtxChannelConfig.Paritybit;
import io.netty.channel.rxtx.RxtxChannelConfig.Stopbits;
import lombok.Data;

/**
 * 串口参数配置类
 * @author 
 * @date 2022年4月26日
 *
 */
@Data
public final class SerialPortParam {

    /**
     * 串口名称,以COM开头(COM0、COM1、COM2等等)
     */
    private String serialPortName;
    /**
     * 波特率, 默认:115200
     */
    private int baudRate = 115200;
    /**
     * 数据位 默认8位
     * 可以设置的值:SerialPort.DATABITS_5、SerialPort.DATABITS_6、SerialPort.DATABITS_7、SerialPort.DATABITS_8
     */
    private Databits dataBits = RxtxChannelConfig.Databits.DATABITS_8;
    /**
     * 停止位
     * 可以设置的值:SerialPort.STOPBITS_1、SerialPort.STOPBITS_2、SerialPort.STOPBITS_1_5
     */
    private Stopbits stopBits = RxtxChannelConfig.Stopbits.STOPBITS_1;
    /**
     * 校验位
     * 可以设置的值:SerialPort.PARITY_NONE、SerialPort.PARITY_ODD、SerialPort.PARITY_EVEN、SerialPort.PARITY_MARK、SerialPort.PARITY_SPACE
     */
    private Paritybit parity = RxtxChannelConfig.Paritybit.NONE;
}

4.netty整合串口的启动类;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.rxtx.RxtxChannel;
import io.netty.channel.rxtx.RxtxDeviceAddress;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

/**
 * 串口接收数据的服务端
 * @author 程就人生
 * @Date
 */
@Slf4j
@Data
@Component
public class RxtxServer {

  private RxtxChannel channel;
  
  private SerialPortParam serialPortParam;
  
    @Autowired
  private RxtxHandler handler;

    public void createRxtx() throws Exception {
        // 串口使用阻塞io
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            Bootstrap bootstrap  = new Bootstrap();
            bootstrap.group(group)
                    .channelFactory(() -> {
                        RxtxChannel rxtxChannel = new RxtxChannel();
                        rxtxChannel.config()
                                .setBaudrate(serialPortParam.getBaudRate()) // 波特率
                                .setDatabits(serialPortParam.getDataBits()) // 数据位
                                .setParitybit(serialPortParam.getParity())      // 校验位
                                .setStopbits(serialPortParam.getStopBits()); // 停止位
                        return rxtxChannel ;
                    })
                    .handler(new ChannelInitializer<RxtxChannel>() {
                        @Override
                        protected void initChannel(RxtxChannel rxtxChannel) {
                            rxtxChannel.pipeline().addLast(
//                                    new LineBasedFrameDecoder(60000),
                                  // 文本形式发送编解码
//                                    new StringEncoder(StandardCharsets.UTF_8),
//                                    new StringDecoder(StandardCharsets.UTF_8),
                                // 十六进制形式发送编解码
                                new ByteArrayDecoder(),
                                new ByteArrayEncoder(),
                                    handler
                            );
                        }
                    });

            ChannelFuture f = bootstrap.connect(new RxtxDeviceAddress(serialPortParam.getSerialPortName())).sync();
            f.addListener(connectedListener);
                
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }


    // 连接监听
    GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> {
      final EventLoop eventLoop = f.channel().eventLoop();
      if (!f.isSuccess()) {
        log.info("连接失败");
      }else{
        channel = (RxtxChannel) f.channel();
        log.info("连接成功");
        sendData();
      }
    };
    
    /**
     * 发送数据
     */
    public void sendData(){
      // 十六机制形式发送
      ByteBuf buf = Unpooled.buffer(2);
      buf.writeByte(3);
      buf.writeByte(2);
      channel.writeAndFlush(buf.array());
      
      // 文本形式发送
      //channel.writeAndFlush("2");
    }
    
    public void start(){
        CompletableFuture.runAsync(()->{
            try {
              // 阻塞的函数
                createRxtx();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, Executors.newSingleThreadExecutor());//不传默认使用ForkJoinPool,都是守护线程
    }

}

串口连接成功后,发送一次数据。

5.把串口启动类放在入口程序里,实际业务中根据实际情况调整;


@SpringBootApplication
public class SpringBootSqliteApplication {

  public static void main(String[] args) {
    //获取application的上下文
        ApplicationContext applicationContext = SpringApplication.run(SpringBootSqliteApplication.class, args);
    
        // 串口连接服务类
    RxtxServer rxtxServer = applicationContext.getBean(RxtxServer.class);
    SerialPortParam serialPort = new SerialPortParam();
    // 连接串口com1
    serialPort.setSerialPortName("COM1");
    rxtxServer.setSerialPortParam(serialPort);  
    rxtxServer.start();
    try {
      // 连接串口需要一点时间,这里稍微等待一下
      Thread.currentThread().sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    // 发送数据
    rxtxServer.sendData();
  }

}

这里又发送了一次数据,所以一共发送了两次数据。

6.运行项目;

在运行项目前,把串口调试工具设置一下,发送数据采用Hex,接收数据也是Hex;如果是String类型,则可以去掉这项勾选;并且设置数据5s中发送一次;

image.png

在控制台,可以看到接收到的数据;


image.png

在串口调试工具这里,又接收到了两组数据。

image.png

最后总结

以上便是netty关于串口采用字节数组的形式通讯的编码。整个编码流程和TCP协议通讯的流程一样。但是,Netty关于串口的编程,相关类相关方法已经过时了。为什么过时了,还有没有什么更好的架包来替代,这是在生产中需要持续关注的。参考资料:

串口调试工具下载:

http://www.zlmcu.com/document/com_debug_tools.html

http://www.zlmcu.com/download.htm

虚拟串口工具下载:

https://www.iotplat.top/#/iot/soft/604ed56de1457928489bf826

相关文档:

Java串口编程RXTX,距离硬件又近了一点

SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端

SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端

Netty中的编解码,至少知道这两种

Netty 之 IdleStateHandler 心跳检测(部分源码分析),实现超时断开连接

Netty 之 ByteBuf 几种分配方案 及 内存溢出相关Bug

Netty整合HTTP协议,实现文件目录服务器

Netty整合JBoss Marshalling编解码
Netty整合Protobuf编解码,并解决半包问题

Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题

Netty中粘包拆包,小试牛刀

SpringBoot整合Netty与websocket客户端进行对话

上一篇下一篇

猜你喜欢

热点阅读