java并发

Netty实现websocket聊天

2018-12-18  本文已影响0人  xbmchina
简介

本文主要参考视频教程,然后自己总结一下而已。

总体流程.png

启动类

主要是配置主线程组和从线程组、绑定端口等基本启动netty服务的操作。


@Component
public class WebSocketServer {


    private EventLoopGroup mainGroup;
    private EventLoopGroup subGroup;
    private ServerBootstrap server;
    private ChannelFuture channelFuture;

    private static class  SingletionWSServer {
        static final WebSocketServer instance = new WebSocketServer();
    }

    public static WebSocketServer getInstance() {
        return SingletionWSServer.instance;
    }


    public WebSocketServer() {
        mainGroup = new NioEventLoopGroup();
        subGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(mainGroup,subGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WebSocketInitialzer());
    }


    public void start() {
        this.channelFuture = server.bind(8088);
        System.err.println("【Netty Server 启动成功】");

    }
}

初始化配置类

主要配置http相关的处理类、大数据流的支持、对httpMessage进行聚合、心跳检测、websocket相关的处理类、自定义消息处理类。


public class WebSocketInitialzer extends ChannelInitializer<SocketChannel> {


    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();

        //======================== http相关=============================
        //websocket基于http协议,所以需要HttpServerCodec
        pipeline.addLast("HttpServerCodec",new HttpServerCodec());

        //对写大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());

        //对httpMessage进行聚合,聚合成AggregatedFullHttpRequest和AggregatedFullHttpResponse
        pipeline.addLast(new HttpObjectAggregator(1024 * 64));


        // ====================== 增加心跳支持 start    ======================
        // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
        // 如果是读空闲或者写空闲,不处理
        pipeline.addLast(new IdleStateHandler(8, 10, 12));
        // 自定义的空闲状态检测
        pipeline.addLast(new HeartBeatHandler());
        // ====================== 增加心跳支持 end    ======================


        //======================== websocket相关=============================

        //websocket服务器处理的协议,用于指定给客户端连接访问的路由:"/ws"
        //本handler会帮你处理一些繁重的复杂的事。会帮你处理握手动作: handshaking
        //对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同。
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));


        //自定义的handler
        pipeline.addLast(new ChatHandler());

    }
}

处理消息的handler

主要是对消息传递、channel的操作、心跳处理逻辑都集中在这里处理。


/**
 * 处理消息的handler
 * TextWebSocketFrame:在netty中,是用于websocket专门处理文本的对象,frame是消息的载体。
 */
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    //获取到所有的客户端channel。
    public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        //1.获取客户端发来的消息
        String content = msg.text();

        Channel currentChannel = ctx.channel();

        //2.判断消息类型,根据不同的类型来处理不同的业务
        DataContent dataContent = JSONObject.parseObject(content, DataContent.class);
        Integer action = dataContent.getAction();

        if (action == NettyConst.CONNECT){
            //  2.1 当websocket,第一次open的时候,初始化channel,把用的channel和userid关联起来
            String senderId = dataContent.getChatMsg().getSenderId();
            UserChannelRel.put(senderId,currentChannel);

        }else if (action == NettyConst.CHAT) {
            //  2.2 聊天类型的消息,把聊天记录保存到数据库中,同时标记消息的签收状态【未签收】
            ChatMsg chatMsg = dataContent.getChatMsg();
            String msgMsg = chatMsg.getMsg();
            String receiverId = chatMsg.getReceiverId();
            String senderId = chatMsg.getSenderId();

            //保存消息到数据库,并且标记为未签收。
            UserService userService = (UserService)SpringUtil.getBean("userService");
            String msgId = userService.saveMsg(chatMsg);
            chatMsg.setMsgId(msgId);

            //发送消息
            //从全局用户channel关系中获取接收方的channel
            Channel receiverChannel = UserChannelRel.get(receiverId);
            if (receiverChannel == null) {
                //TODO 推送消息

            }else {
                //当receiverChannel不为空是,从channelGroup中查找对应的channel是否存在
                Channel findChannel = users.find(receiverChannel.id());
                    if (findChannel != null) {
                        //用户在线
                        receiverChannel.writeAndFlush(
                                new TextWebSocketFrame(
                                        JSONObject.toJSONString(chatMsg)));
                    }else {
                        //用户离线

                    }

            }

        }else  if (action == NettyConst.SIGNED) {
            //  2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应的消息签收状态【已签收】
            UserService userService = (UserService)SpringUtil.getBean("userService");
            //扩展字段在signed类型的消息中,代表需要去签收的消息id,逗号分隔
            String msgIdsStr = dataContent.getExtand();
            String[] msgIds = msgIdsStr.split(",");

            List<String> msgIdList = new ArrayList<>();
            for (String mid : msgIdList) {
                if (StringUtils.isNotBlank(mid)){
                    msgIdList.add(mid);
                }
            }
            System.out.println(msgIdList.toString());

            if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() >0) {
                //批量签收
                userService.updateMsgSigned(msgIdList);
            }

        }else if (action == NettyConst.KEEPALIVE){
            //  2.4 心跳类型的消息
            System.out.println("收到来自channel为[" + currentChannel + "]的心跳包...");
        }

    }


    /**
     * 当客户端连接服务端之后(打开连接)
     * 获取客户端的channel,并且放到ChannelGroup中去进行管理
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        users.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //当触发handler销毁时,这个会自动的移除的。
        users.remove(ctx.channel());

    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //发生异常之后关闭连接,随后从ChannelGroup中移除
        ctx.channel().close();
        users.remove(ctx.channel());
    }
}

心跳处理handler

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 )
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent)evt;     // 强制类型转换

            if (event.state() == IdleState.READER_IDLE) {
                System.out.println("进入读空闲...");
            } else if (event.state() == IdleState.WRITER_IDLE) {
                System.out.println("进入写空闲...");
            } else if (event.state() == IdleState.ALL_IDLE) {

                System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());

                Channel channel = ctx.channel();
                // 关闭无用的channel,以防资源浪费
                channel.close();

                System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
            }
        }

    }

}

聊天实体类DataContent

@Data
public class DataContent implements Serializable{

    private static final long serialVersionUID = 1L;

    private Integer action;     //动作类型
    private ChatMsg chatMsg;    //用户的聊天内容entity
    private String extand;      //扩展字段


}

页面中的调用websocket服务

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8" />
        <title></title>
    </head>
    <body>
        <div>发送消息</div>
        <input type="text" id="msgContent" />
        <input type="button" value="点我发送" onclick="CHAT.chat()" />
        
        <div>接收消息:</div>
        <div id="receiveMsg" style="background: gray;"></div>
        
        
        <script type="application/javascript">
            window.CHAT = {
                socket: null,
                init: function() {
                    if (window.WebSocket){
                        CHAT.socket = new WebSocket("ws://192.168.11.138:8088/ws");
                        
                        CHAT.socket.onopen = function() {
                            console.log("onopen连接成功。。。");
                        },
                        CHAT.socket.onclose = function() {
                            console.log("onclose连接关闭。。。");

                        },
                        CHAT.socket.onerror = function() {
                            console.log("onerror发生异常。。。");

                        },
                        CHAT.socket.onmessage = function(e) {
                            console.log("onmessage接收到消息:"+e.data);
                            var receiveMsg = document.getElementById("receiveMsg");
                            var html = receiveMsg.innerHTML;
                            receiveMsg.innerHTML = html+"<br>" + e.data;
                        }
                        
                    }else{
                        alert("浏览器不支持websocket协议.....");
                    }
                },
                chat: function() {
                    var msg = document.getElementById("msgContent");
                    CHAT.socket.send(msg.value);
                }
            }
            
            CHAT.init();
        </script>
        
    </body>
</html>

上一篇 下一篇

猜你喜欢

热点阅读