Netty实现websocket聊天
2018-12-18 本文已影响0人
xbmchina
简介
本文主要参考视频教程,然后自己总结一下而已。
总体流程.png启动类
主要是配置主线程组和从线程组、绑定端口等基本启动netty服务的操作。
@Component
public class WebSocketServer {
private EventLoopGroup mainGroup;
private EventLoopGroup subGroup;
private ServerBootstrap server;
private ChannelFuture channelFuture;
private static class SingletionWSServer {
static final WebSocketServer instance = new WebSocketServer();
}
public static WebSocketServer getInstance() {
return SingletionWSServer.instance;
}
public WebSocketServer() {
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(mainGroup,subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketInitialzer());
}
public void start() {
this.channelFuture = server.bind(8088);
System.err.println("【Netty Server 启动成功】");
}
}
初始化配置类
主要配置http相关的处理类、大数据流的支持、对httpMessage进行聚合、心跳检测、websocket相关的处理类、自定义消息处理类。
public class WebSocketInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//======================== http相关=============================
//websocket基于http协议,所以需要HttpServerCodec
pipeline.addLast("HttpServerCodec",new HttpServerCodec());
//对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//对httpMessage进行聚合,聚合成AggregatedFullHttpRequest和AggregatedFullHttpResponse
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
// ====================== 增加心跳支持 start ======================
// 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
// 如果是读空闲或者写空闲,不处理
pipeline.addLast(new IdleStateHandler(8, 10, 12));
// 自定义的空闲状态检测
pipeline.addLast(new HeartBeatHandler());
// ====================== 增加心跳支持 end ======================
//======================== websocket相关=============================
//websocket服务器处理的协议,用于指定给客户端连接访问的路由:"/ws"
//本handler会帮你处理一些繁重的复杂的事。会帮你处理握手动作: handshaking
//对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同。
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//自定义的handler
pipeline.addLast(new ChatHandler());
}
}
处理消息的handler
主要是对消息传递、channel的操作、心跳处理逻辑都集中在这里处理。
/**
* 处理消息的handler
* TextWebSocketFrame:在netty中,是用于websocket专门处理文本的对象,frame是消息的载体。
*/
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//获取到所有的客户端channel。
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//1.获取客户端发来的消息
String content = msg.text();
Channel currentChannel = ctx.channel();
//2.判断消息类型,根据不同的类型来处理不同的业务
DataContent dataContent = JSONObject.parseObject(content, DataContent.class);
Integer action = dataContent.getAction();
if (action == NettyConst.CONNECT){
// 2.1 当websocket,第一次open的时候,初始化channel,把用的channel和userid关联起来
String senderId = dataContent.getChatMsg().getSenderId();
UserChannelRel.put(senderId,currentChannel);
}else if (action == NettyConst.CHAT) {
// 2.2 聊天类型的消息,把聊天记录保存到数据库中,同时标记消息的签收状态【未签收】
ChatMsg chatMsg = dataContent.getChatMsg();
String msgMsg = chatMsg.getMsg();
String receiverId = chatMsg.getReceiverId();
String senderId = chatMsg.getSenderId();
//保存消息到数据库,并且标记为未签收。
UserService userService = (UserService)SpringUtil.getBean("userService");
String msgId = userService.saveMsg(chatMsg);
chatMsg.setMsgId(msgId);
//发送消息
//从全局用户channel关系中获取接收方的channel
Channel receiverChannel = UserChannelRel.get(receiverId);
if (receiverChannel == null) {
//TODO 推送消息
}else {
//当receiverChannel不为空是,从channelGroup中查找对应的channel是否存在
Channel findChannel = users.find(receiverChannel.id());
if (findChannel != null) {
//用户在线
receiverChannel.writeAndFlush(
new TextWebSocketFrame(
JSONObject.toJSONString(chatMsg)));
}else {
//用户离线
}
}
}else if (action == NettyConst.SIGNED) {
// 2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应的消息签收状态【已签收】
UserService userService = (UserService)SpringUtil.getBean("userService");
//扩展字段在signed类型的消息中,代表需要去签收的消息id,逗号分隔
String msgIdsStr = dataContent.getExtand();
String[] msgIds = msgIdsStr.split(",");
List<String> msgIdList = new ArrayList<>();
for (String mid : msgIdList) {
if (StringUtils.isNotBlank(mid)){
msgIdList.add(mid);
}
}
System.out.println(msgIdList.toString());
if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() >0) {
//批量签收
userService.updateMsgSigned(msgIdList);
}
}else if (action == NettyConst.KEEPALIVE){
// 2.4 心跳类型的消息
System.out.println("收到来自channel为[" + currentChannel + "]的心跳包...");
}
}
/**
* 当客户端连接服务端之后(打开连接)
* 获取客户端的channel,并且放到ChannelGroup中去进行管理
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
users.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//当触发handler销毁时,这个会自动的移除的。
users.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//发生异常之后关闭连接,随后从ChannelGroup中移除
ctx.channel().close();
users.remove(ctx.channel());
}
}
心跳处理handler
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 )
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)evt; // 强制类型转换
if (event.state() == IdleState.READER_IDLE) {
System.out.println("进入读空闲...");
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("进入写空闲...");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
Channel channel = ctx.channel();
// 关闭无用的channel,以防资源浪费
channel.close();
System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
}
}
}
}
聊天实体类DataContent
@Data
public class DataContent implements Serializable{
private static final long serialVersionUID = 1L;
private Integer action; //动作类型
private ChatMsg chatMsg; //用户的聊天内容entity
private String extand; //扩展字段
}
页面中的调用websocket服务
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
</head>
<body>
<div>发送消息</div>
<input type="text" id="msgContent" />
<input type="button" value="点我发送" onclick="CHAT.chat()" />
<div>接收消息:</div>
<div id="receiveMsg" style="background: gray;"></div>
<script type="application/javascript">
window.CHAT = {
socket: null,
init: function() {
if (window.WebSocket){
CHAT.socket = new WebSocket("ws://192.168.11.138:8088/ws");
CHAT.socket.onopen = function() {
console.log("onopen连接成功。。。");
},
CHAT.socket.onclose = function() {
console.log("onclose连接关闭。。。");
},
CHAT.socket.onerror = function() {
console.log("onerror发生异常。。。");
},
CHAT.socket.onmessage = function(e) {
console.log("onmessage接收到消息:"+e.data);
var receiveMsg = document.getElementById("receiveMsg");
var html = receiveMsg.innerHTML;
receiveMsg.innerHTML = html+"<br>" + e.data;
}
}else{
alert("浏览器不支持websocket协议.....");
}
},
chat: function() {
var msg = document.getElementById("msgContent");
CHAT.socket.send(msg.value);
}
}
CHAT.init();
</script>
</body>
</html>