SpringBoot整合Netty简单Demo之网页聊天室

2017-10-13  本文已影响0人  徐森威

利用WebSocket实现

说到网页聊天室一般都是使用WebSocket长连接进行数据交互和双端数据发送,本人也已经整合了一整套依赖于springboot-websocket包的网络交互Demo,具体功能如下:

  1. 多用户群聊
  2. 点对点私聊
  3. 实时消息通知
  4. 在线用户显示
  5. 上线、断线等实时监听
  6. 其他在线通讯
WebSocket依赖包
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

SpringBoot简单整合Netty

在Netty中可以集成WebSocket,以下Demo只实现了用户群聊,其他功能可加逻辑处理自行扩展

  @PropertySource(value= "classpath:/nettyserver.properties")
  @SpringBootApplication
  public class NettyApplication {

    @Value("${tcp.port}")
    private int tcpPort;

    @Value("${boss.thread.count}")
    private int bossCount;

    @Value("${worker.thread.count}")
    private int workerCount;

    @Value("${so.keepalive}")
    private boolean keepAlive;

    @Value("${so.backlog}")
    private int backlog;

    @Bean(name = "serverBootstrap")
    public ServerBootstrap bootstrap() {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup(), workerGroup())
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childHandler(nettyWebSocketChannelInitializer);
        Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
        Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
        for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
            b.option(option, tcpChannelOptions.get(option));
        }
        return b;
    }

    @Autowired
    @Qualifier("somethingChannelInitializer")
    private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;

    @Bean(name = "tcpChannelOptions")
    public Map<ChannelOption<?>, Object> tcpChannelOptions() {
        Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
        options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
        options.put(ChannelOption.SO_BACKLOG, backlog);
        return options;
    }

    @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
    public NioEventLoopGroup bossGroup() {
        return new NioEventLoopGroup(bossCount);
    }

    @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
    public NioEventLoopGroup workerGroup() {
        return new NioEventLoopGroup(workerCount);
    }

    @Bean(name = "tcpSocketAddress")
    public InetSocketAddress tcpPort() {
        return new InetSocketAddress(tcpPort);
    }

    public static void main(String[] args) throws Exception{
        ConfigurableApplicationContext context = SpringApplication.run(NettyApplication.class, args);
        TCPServer tcpServer = context.getBean(TCPServer.class);
        tcpServer.start();
    }
}
@Component
public class TCPServer {

   @Autowired
   @Qualifier("serverBootstrap")
   private ServerBootstrap serverBootstrap;

   @Autowired
   @Qualifier("tcpSocketAddress")
   private InetSocketAddress tcpPort;

   private Channel serverChannel;

   public void start() throws Exception {
       serverChannel =  serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();
   }

   @PreDestroy
   public void stop() throws Exception {
       serverChannel.close();
       serverChannel.parent().close();
   }

   public ServerBootstrap getServerBootstrap() {
       return serverBootstrap;
   }

   public void setServerBootstrap(ServerBootstrap serverBootstrap) {
       this.serverBootstrap = serverBootstrap;
   }

   public InetSocketAddress getTcpPort() {
       return tcpPort;
   }

   public void setTcpPort(InetSocketAddress tcpPort) {
       this.tcpPort = tcpPort;
   }
}
@Component
@Qualifier("somethingChannelInitializer")
public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    private TextWebSocketFrameHandler textWebSocketFrameHandler;

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(textWebSocketFrameHandler);   //这里不能使用new,不然在handler中不能注入依赖

    }
}
@Component
@Qualifier("textWebSocketFrameHandler")
@ChannelHandler.Sharable
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Autowired
    private RedisDao redisDao;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                                TextWebSocketFrame msg) throws Exception {
        Channel incoming = ctx.channel();
        String uName = redisDao.getString(incoming.id()+"");
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text()));
            } else {
                channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  
        System.out.println(ctx.channel().remoteAddress());
        String uName = new RandomName().getRandomName();  //用来获取一个随机的用户名,可以用其他方式代替

        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[新用户] - " + uName + " 加入"));
        }
        redisDao.saveString(incoming.id()+"",uName);   //存储用户
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 
        Channel incoming = ctx.channel();
        String uName = redisDao.getString(String.valueOf(incoming.id()));
        for (Channel channel : channels) {
            channel.writeAndFlush(new TextWebSocketFrame("[用户] - " + uName + " 离开"));
        }
        redisDao.deleteString(String.valueOf(incoming.id()));   //删除用户
        redisDao.saveString("cacheName",redisDao.getString("cacheName").replaceAll(uName,""));   //标准已经使用的用户名
        channels.remove(ctx.channel());  
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
        Channel incoming = ctx.channel();
        System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"在线");
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
        Channel incoming = ctx.channel();
        System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"掉线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"异常");
        cause.printStackTrace();
        ctx.close();
    }

}

这边使用Redis保存用户名和ChannelId来不同浏览器登录的用户

如果要在Controller中使用Channel向客户端发送数据,只要注入TextWebSocketFrameHandler,取得其中的ChannelGroup,再通过自己逻辑处理后存储的ChannelId来取得对应的Channel,即可向客户端发送消息

Netty依赖包
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.16.Final</version>
</dependency>
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
    var socket;
    if (!window.WebSocket) {
        window.WebSocket = window.MozWebSocket;
    }
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:8090/ws");
        socket.onmessage = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = ta.value + '\n' + event.data
        };
        socket.onopen = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = "连接开启!";
        };
        socket.onclose = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = ta.value + "连接被关闭";
        };
    } else {
        alert("你的浏览器不支持 WebSocket!");
    }

    function send(message) {
        if (!window.WebSocket) {
            return;
        }
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("连接没有开启.");
        }
    }
    window.onbeforeunload = function(event) {
        event.returnValue = "刷新提醒";
    };
</script>
<form onsubmit="return false;">
    <h3>netty 聊天室:</h3>
    <textarea id="responseText" style="width: 400px; height: 300px;"></textarea>
    <br>
    <input type="text" name="message"  style="width: 300px" value="测试数据">
    <input type="button" value="发送消息" onclick="send(this.form.message.value)">
</form>
<br>
<br>
</body>
</html>
tcp.port=8090
boss.thread.count=2
worker.thread.count=2
so.keepalive=true
so.backlog=100

效果截图

群聊效果截图

Git地址
https://github.com/zyf970617/springboot-netty-demo

上一篇下一篇

猜你喜欢

热点阅读