springboot+netty,webSocket与modbu
2021-06-08 本文已影响0人
帷幕丶归心
准备工作
springboot 2.4.2
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.59.Final</version>
</dependency>
1.创建NettyServer.java,关键代码如下
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger("-----NettyServer-----");
private RedisUtil redisUtil;
private HandlerService handlerService;
private static ChannelGroup deviceChannelGroup;
private static Map<String, ChannelId> deviceMap = new ConcurrentHashMap<>();
/**
* WEB-SOCKET
*/
private static ChannelGroup socketChannelGroup;
private static Map<String, ChannelId> socketMap = new ConcurrentHashMap<>();
/**
* bossGroup就是parentGroup,是负责处理TCP/IP连接的
*/
private EventLoopGroup bossGroup = null;
/**
* workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件
*/
private EventLoopGroup workerGroup = null;
public NettyServer(RedisUtil redisUtil, HandlerService handlerService) {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
this.redisUtil = redisUtil;
this.handlerService = handlerService;
deviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
socketChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
public void bind(int tcp,int socket) throws Exception {
ServerBootstrap device = new ServerBootstrap();
device.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//初始化服务端可连接队列,指定了队列的大小128
.option(ChannelOption.SO_BACKLOG, 1024)
//保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
// 绑定客户端连接时候触发操作
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sh) throws Exception {
InetSocketAddress address = sh.remoteAddress();
logger.debug("TCP 客户端IP:" + address.getAddress() + ":" + address.getPort());
sh.pipeline()
//项目需要,定长消息,可以替换为其他的
.addLast(new FixedLengthFrameDecoder(10))
//消息处理
.addLast("HeartBeat", new HeartBeatHandler(redisUtil, handlerService));
}
});
//绑定监听端口,调用sync同步阻塞方法等待绑定操作完成,完成后返回ChannelFuture类似于JDK中Future
ServerBootstrap webSocket = new ServerBootstrap();
webSocket.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//初始化服务端可连接队列,指定了队列的大小128
.option(ChannelOption.SO_BACKLOG, 1024)
//保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
// 绑定客户端连接时候触发操作
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sh) throws Exception {
InetSocketAddress address = sh.remoteAddress();
logger.debug("WEB SOCKET客户端IP:" + address.getAddress() + ":" + address.getPort());
sh.pipeline()
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(65535))
.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65535))
.addLast(new WebSocketHandler());
}
});
//绑定监听端口,调用sync同步阻塞方法等待绑定操作完成,完成后返回ChannelFuture类似于JDK中Future
ChannelFuture futureDevice = device.bind(tcp).sync();
ChannelFuture futureWebSocket = webSocket.bind(socket).sync();
if (futureDevice.isSuccess()) {
logger.debug("TCP 服务端启动成功");
} else {
logger.debug("TCP 服务端启动失败");
futureDevice.cause().printStackTrace();
bossGroup.shutdownGracefully(); //关闭线程组
workerGroup.shutdownGracefully();
}
if (futureWebSocket.isSuccess()) {
logger.debug("WEB-SOCKET服务端启动成功");
} else {
logger.debug("WEB-SOCKET服务端启动失败");
futureWebSocket.cause().printStackTrace();
bossGroup.shutdownGracefully(); //关闭线程组
workerGroup.shutdownGracefully();
}
//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
futureDevice.channel().closeFuture().sync();
futureWebSocket.channel().closeFuture().sync();
}
public void unbind() {
if (null != bossGroup && !bossGroup.isShutdown()) {
bossGroup.shutdownGracefully();
bossGroup = null;
}
if (null != workerGroup && !workerGroup.isShutdown()) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
}
/**
* WEB-SOCKET 操作 开始
*/
public static void socketAdd(Channel channel) {
socketChannelGroup.add(channel);
}
public static void socketRemove(Channel channel) {
socketChannelGroup.remove(channel);
removeSocketChannelId(channel.id());
}
public static ChannelGroup socketChannelGroup() {
return socketChannelGroup;
}
public static void putSocketChannelId(String code, ChannelId channelId) {
socketMap.put(code, channelId);
}
public static void removeSocketChannelId(ChannelId channelId) {
socketMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
}
public static ChannelId socketChannelId(String code) {
return socketMap.getOrDefault(code, null);
}
public static Channel socketChannel(ChannelId channelId){
return socketChannelGroup.find(channelId);
}
public static Map<String,ChannelId> socketMap(){
return socketMap;
}
/**
* WEB-SOCKET 操作结束
* DEVICE 操作 开始
*/
public static void deviceAdd(Channel channel) {
deviceChannelGroup.add(channel);
}
public static void deviceRemove(Channel channel) {
deviceChannelGroup.remove(channel);
removeDeviceChannelId(channel.id());
}
public static ChannelGroup deviceChannelGroup() {
return deviceChannelGroup;
}
public static void putDeviceChannelId(String code, ChannelId channelId) {
deviceMap.put(code, channelId);
}
public static void removeDeviceChannelId(ChannelId channelId) {
deviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
}
public static ChannelId deviceChannelId(String code) {
return deviceMap.getOrDefault(code, null);
}
public static Channel deviceChannel(ChannelId channelId){
return deviceChannelGroup.find(channelId);
}
public static Map<String,ChannelId> deviceMap(){
return deviceMap;
}
/**
* DEVICE 操作 结束
*/
}
2.创建对应消息处理类
2.1(ModBus)消息处理类 HeartBeatHandler.java
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger("-----HeartBeatHandler-----");
private RedisUtil redisUtil;
private HandlerService handlerService;
public HeartBeatHandler(RedisUtil redisUtil, HandlerService handlerService) {
this.redisUtil = redisUtil;
this.handlerService = handlerService;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//以下为示例代码,具体按实际功能需求来;
String code = "具体获取code操作";
sendMessageToWebSocket(code,"发送消息");
}
public void sendMessageToWebSocket(String code, String message) {
ChannelId channelId = NettyServer.socketChannelId(code);
if (channelId != null) {
Channel socketChannel = NettyServer.socketChannel(channelId);
if (socketChannel != null) {
socketChannel.writeAndFlush(new TextWebSocketFrame(message)).addListener((ChannelFutureListener) future -> {
logger.info("WEB SOCKET {},{}", code, message);
logger.info("WEB SOCKET DONE:{}", future.isDone());
logger.info("WEB SOCKET SUCCESS:{}", future.isSuccess());
});
} else {
logger.info("channels is null");
}
} else {
logger.info("channelsId is null");
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.info("接收到客户端信息完成");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Exception) {
logger.info("异常捕获");
cause.printStackTrace();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("CLIENT" + getRemoteAddress(ctx) + " 接入连接");
NettyServer.deviceAdd(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("CLIENT" + getRemoteAddress(ctx) + " 断开连接");
NettyServer.deviceRemove(ctx.channel());
ctx.close();
}
public static String getRemoteAddress(ChannelHandlerContext ctx) {
return ctx.channel().remoteAddress().toString();
}
2.2(WebSocket)消息处理类 WebSocketHandler.java
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channel.id();
logger.info("与客户端建立连接,通道开启!channelId:{}",channel.id());
// 添加到channelGroup通道组
NettyServer.socketAdd(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("与客户端建立连接,通道关闭!");
NettyServer.socketRemove(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
logger.info("服务器收到的数据:" + msg.text());
NettyServer.putSocketChannelId(msg.text(),ctx.channel().id());
//简易的保持心跳
sendMessage(ctx);
}
private void sendMessage(ChannelHandlerContext ctx) {
logger.info("服务器回复:0");
ctx.channel().writeAndFlush(new TextWebSocketFrame("0")).addListener((ChannelFutureListener) future -> {
logger.info("WEB-SOCKET 心跳回复:0");
logger.info("WEB SOCKET DONE:{}",future.isDone());
logger.info("WEB SOCKET SUCCESS:{}",future.isSuccess());
});;
}
private void sendAllMessage() {
String message = "发送群消息";
NettyServer.socketChannelGroup().writeAndFlush(new TextWebSocketFrame(message));
}
3.使用方法
在对应的SpringBoot 启动类中使用
@Component
public static class StartApplication implements ApplicationRunner {
private NettyServer nettyServer;
@Resource
private HandlerService handlerService;
@Resource
private RedisUtil redis;
@Override
public void run(ApplicationArguments args) throws Exception {
logger.info("进程开启!");
nettyServer = new NettyServer(redis, handlerService);
nettyServer.bind(port1,port2);
}
@PreDestroy
public void destroy() throws Exception {
logger.info("进程关闭!");
nettyServer.unbind();
}
}
4.至此,功能完成
感谢您的关注,有使用不当的地方,请指正,愿共勉··· ···
不接收辱骂,如有不适,请移步··· ···