Java WebSocket 基于spring + sockjs
2018-03-24 本文已影响0人
酷酷的小k
简述:支持发送公告、发送指定用户及异地登录。
<servlet>,<filter>标签需要加入<async-supported>true</async-supported>
该标签支持异步处理,例如WebSocket。
定义收发消息实体类:
package com.test.springWebsocket;
public class WebMessage {
/**
* 用户id
*/
private Long userId;
/**
* 用户名
*/
private String username;
/**
* 客户端标记
*/
private String clientMark;
/**
* 内容
*/
private String contents;
/**
* 消息类型,1.公告,2.点对点发消息,3.检查异地登录
*/
private String type;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getClientMark() {
return clientMark;
}
public void setClientMark(String clientMark) {
this.clientMark = clientMark;
}
public String getContents() {
return contents;
}
public void setContents(String contents) {
this.contents = contents;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
定义WebSocket用户实体类:
package com.test.springWebsocketTwo;
public class WsUser {
/**
* 用户id
*/
private Long userId;
/**
* 用户名
*/
private String username;
/**
* 客户端标记
*/
private String clientMark;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getClientMark() {
return clientMark;
}
public void setClientMark(String clientMark) {
this.clientMark = clientMark;
}
}
定义握手拦截器类:
package com.test.springWebsocketTwo;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* 拦截用户登录信息,并将用户登录信息交给WebSocket的WebSocketSession来管理
*/
public class ImplHandshakeInterceptor implements HandshakeInterceptor {
/**
* 握手之前
*
* @param request ServerHttpRequest
* @param response ServerHttpResponse
* @param wsHandler WebSocketHandler
* @param attributes Map<String, Object>
* @return boolean
* @throws Exception Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest req = (ServletServerHttpRequest) request;
Long userId = Long.parseLong(req.getServletRequest().getParameter("userId"));
String username = req.getServletRequest().getParameter("username");
String clientMark = req.getServletRequest().getParameter("clientMark");
WsUser wsUser = new WsUser();
wsUser.setUserId(userId);
wsUser.setUsername(username);
wsUser.setClientMark(clientMark);
attributes.put("wsUser", wsUser);
}
return true;
}
/**
* 握手之后
*
* @param request ServerHttpRequest
* @param response ServerHttpResponse
* @param wsHandler WebSocketHandler
* @param exception Exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
定义WebSocket处理类:
package com.test.springWebsocketTwo;
import com.google.gson.Gson;
import com.test.springWebsocket.WebMessage;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 消息处理类
*/
public class ImplWebSocketHandler implements WebSocketHandler {
private static List<WebSocketSession> sessionList = new ArrayList<>();
private synchronized void addWebSocketSession(WebSocketSession session) {
sessionList.add(session);
}
private synchronized void removeWebSocketSession(WebSocketSession session) {
sessionList.remove(session);
}
/**
* 给所有的用户发送消息
*
* @param webMsg WebMessage
*/
public void sendToAllMsg(WebMessage webMsg) {
String jsonStr = new Gson().toJson(webMsg);
TextMessage textMsg = new TextMessage(jsonStr);
for (WebSocketSession session : sessionList) {
WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
try {
//isOpen()在线就发送
if (session.isOpen()) {
session.sendMessage(textMsg);
}
} catch (IOException e) {
System.out.println(String.format("用户<%s>发送消息失败|消息体<%s>", wsUser.getUsername(), jsonStr));
e.printStackTrace();
}
}
}
/**
* 发送消息给指定的用户
*
* @param webMsg WebMessage
*/
public void sendToUserMsg(WebMessage webMsg) {
String jsonStr = new Gson().toJson(webMsg);
TextMessage textMsg = new TextMessage(jsonStr);
for (WebSocketSession session : sessionList) {
WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
if (wsUser.getUserId().equals(webMsg.getUserId())) {
try {
//isOpen()在线就发送
if (session.isOpen()) {
session.sendMessage(textMsg);
}
} catch (IOException e) {
System.out.println(String.format("用户<%s>发送消息失败|消息体<%s>", wsUser.getUsername(), jsonStr));
e.printStackTrace();
}
}
}
}
/**
* 连接建立之后
*
* @param session WebSocketSession
* @throws Exception Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
WebMessage webMsg = new WebMessage();
webMsg.setUserId(wsUser.getUserId());
webMsg.setUsername(wsUser.getUsername());
webMsg.setClientMark(wsUser.getClientMark());
webMsg.setType("3");
sendToUserMsg(webMsg);
addWebSocketSession(session);
System.out.println(String.format("用户<%s>打开WebSocket连接...", wsUser.getUsername()));
}
/**
* 处理消息
*
* @param session WebSocketSession
* @param webSocketMsg WebSocketMessage<?>
* @throws Exception Exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> webSocketMsg) throws Exception {
String jsonStr = (String) webSocketMsg.getPayload();
WebMessage webMsg = new Gson().fromJson(jsonStr, WebMessage.class);
if (webMsg.getUserId() == null) {
sendToAllMsg(webMsg);
} else {
sendToUserMsg(webMsg);
}
}
/**
* 处理传输错误
*
* @param session WebSocketSession
* @param error Throwable
* @throws Exception Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable error) throws Exception {
error.printStackTrace();
}
/**
* 连接关闭之后
*
* @param session WebSocketSession
* @param closeStatus CloseStatus
* @throws Exception Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
if (session.isOpen()) {
session.close();
}
removeWebSocketSession(session);
System.out.println(String.format("用户<%s>关闭WebSocket连接...", wsUser.getUsername()));
}
/**
* WebSocketHandler是否处理部分消息.
* 如果此标志设置为 true并且基础WebSocket服务器支持部分消息,
* 则可能会拆分大型WebSocket消息或其中一个未知大小的消息,
* 并可能通过多次调用接收消息 handleMessage(WebSocketSession, WebSocketMessage).
* 该标志 WebSocketMessage.isLast()指示该消息是否部分以及它是否是最后一部分.
*
* @return boolean
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
定义WebSocket配置:
package com.test.springWebsocketTwo;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* @Bean 相当于spring xml里面的bean标签
* @Configuration 等价于xml中配置beans标签
* @EnableWebSocket 对WebSocket的支持
*/
@Configuration
@EnableWebSocket
public class ImplWebSocketConfigurer implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//WebSocket的支持
registry.addHandler(WebSocketPushHandler(), "/web-socket-server").addInterceptors(new ImplHandshakeInterceptor());
//sock js的支持
registry.addHandler(WebSocketPushHandler(), "/sock-js/web-socket-server").addInterceptors(new ImplHandshakeInterceptor()).withSockJS();
}
@Bean
public WebSocketHandler WebSocketPushHandler() {
return new ImplWebSocketHandler();
}
}
定义页面RequestMapping:
/**
* spring WebSocket two 页面
*
* @param request HttpServletRequest
* @return String
*/
@RequestMapping("/spring-websocket-two.xhtm")
public String springWebsocketTwo(HttpServletRequest request) {
String clientMark = (String) request.getSession().getAttribute("clientMark");
if (clientMark == null) {
clientMark = GenerateUtil.getUUID();
request.getSession().setAttribute("clientMark", clientMark);
}
Admin admin = (Admin) request.getSession().getAttribute("admin");
request.setAttribute("userId", admin.getId());
request.setAttribute("username", admin.getAdmin());
request.setAttribute("clientMark", clientMark);
return "springWebsocketTwo/springWebsocketTwo";
}
定义jsp页面:
<!DOCTYPE html>
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>
<%
String rootPath = request.getContextPath();
String wsBasePath = "ws://" + request.getServerName() + ":" + request.getServerPort() + rootPath + "/";
String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + rootPath + "/";
%>
<html>
<head>
<title>Spring WebSocket Two</title>
<meta http-equiv="Expires" content="0"/>
<meta http-equiv="Cache" content="no-cache"/>
<meta http-equiv="Pragma" content="no-cache"/>
<meta http-equiv="Cache-Control" content="no-cache"/>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<script type="text/javascript" src="../../../static/js/plugins/jquery/jquery-1.9.1.min.js"></script>
<script type="text/javascript" src="../../../static/js/common/sockjs.v1.0.0.min.js"></script>
<script type="text/javascript">
var userId = "${userId}";
var username = "${username}";
var clientMark = "${clientMark}";
var wsBasePath = "<%=wsBasePath%>";
var basePath = "<%=basePath%>";
</script>
<style>
.c_box {
width: 800px;
height: 300px;
border: 1px solid;
overflow: auto;
}
</style>
</head>
<body>
<div>用户id:${userId}</div>
<div>用户名:${username}</div>
<div>
<span>公告:</span>
<input type="text" id="notice"/>
<input type="button" id="sendNotice" value="发送"/>
</div>
<div>
<span>用户id:</span>
<input type="text" id="toUserId"/>
<span>消息:</span>
<input type="text" id="toUserMsg"/>
<input type="button" id="sendToUser" value="发送"/>
</div>
<div>公告列表:</div>
<div id="noticeList" class="c_box"></div>
<div>消息列表:</div>
<div id="msgList" class="c_box"></div>
<script type="text/javascript" src="../../../static/js/springWebsocketTwo/springWebsocketTwo.js"></script>
</body>
</html>
定义JavaScript脚本:
var ws = null;
var wsParams = "?userId=" + userId + "&username=" + username + "&clientMark=" + clientMark;
if ('WebSocket' in window) {
ws = new WebSocket(wsBasePath + "web-socket-server" + wsParams);
} else if ('MozWebSocket' in window) {
ws = new MozWebSocket(wsBasePath + "web-socket-server" + wsParams);
} else {
//兼容低版本浏览器
ws = new SockJS(basePath + "sock-js/web-socket-server" + wsParams);
}
ws.onopen = function (event) {
console.log("WebSocket连接成功");
};
ws.onmessage = function (event) {
showMsg(JSON.parse(event.data));
};
ws.onerror = function (event) {
console.log("WebSocket连接错误");
};
ws.onclose = function (event) {
console.log("WebSocket连接关闭");
};
function closeSocket() {
if (ws == null || ws.readyState == 2 || ws.readyState == 3) {
return true;
}
ws.close();
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭WebSocket连接,防止连接还没断开就关闭窗口,server端会抛异常.
window.onbeforeunload = function () {
closeSocket();
};
function showMsg(webMsg) {
switch (webMsg['type']) {
//公告
case '1': {
var noticeHtm = '<div>' + webMsg['contents'] + '</div>';
$('#noticeList').append(noticeHtm);
$("#noticeList").scrollTop($("#noticeList")[0].scrollHeight);
break;
}
//点对点发消息
case '2': {
var msgHtm = '<div>' + webMsg['contents'] + '</div>';
$('#msgList').append(msgHtm);
$("#msgList").scrollTop($("#msgList")[0].scrollHeight);
break;
}
//检查异地登录
case '3': {
if (webMsg['clientMark'] != clientMark) {
closeSocket();
alert('您的账号在另一处登录');
}
break;
}
default: {
alert("WebSocket接收到未知消息...");
break;
}
}
}
$("#sendNotice").on("click", function () {
if (ws == null) {
alert('WebSocket连接未打开');
return true;
}
if (ws.readyState == 0) {
alert('WebSocket正在连接中,请稍后再发送消息');
return true;
}
if (ws.readyState == 2) {
alert('WebSocket连接正在关闭中,无法发送消息');
return true;
}
if (ws.readyState == 3) {
alert('WebSocket连接已关闭,无法发送消息');
return true;
}
var notice = $("#notice").val();
if (notice.length > 0) {
var webMsg = {
'contents': notice,
'type': '1'
};
ws.send(JSON.stringify(webMsg));
}
});
$("#sendToUser").on("click", function () {
if (ws == null) {
alert('WebSocket连接未打开');
return true;
}
if (ws.readyState == 0) {
alert('WebSocket正在连接中,请稍后再发送消息');
return true;
}
if (ws.readyState == 2) {
alert('WebSocket连接正在关闭中,无法发送消息');
return true;
}
if (ws.readyState == 3) {
alert('WebSocket连接已关闭,无法发送消息');
return true;
}
var toUserId = $("#toUserId").val();
var toUserMsg = $("#toUserMsg").val();
if (toUserId.length > 0 && toUserMsg.length > 0) {
var webMsg = {
'userId': toUserId,
'username': username,
'contents': toUserMsg,
'type': '2'
};
ws.send(JSON.stringify(webMsg));
}
});
如果想要在service发送消息,如下:
/**
* 发送消息
*/
public void sendMsg() {
WebMessage toAllMsg = new WebMessage();
toAllMsg.setType("1");
toAllMsg.setContents("发送公告...");
getImplWebSocketHandler().sendToAllMsg(toAllMsg);
WebMessage toUserMsg = new WebMessage();
toUserMsg.setType("2");
toUserMsg.setUserId(2L);
toUserMsg.setUsername("Jane");
toUserMsg.setContents("发送消息给指定用户...");
getImplWebSocketHandler().sendToUserMsg(toUserMsg);
}
/**
* 注入消息处理类
*
* @return ImplWebSocketHandler
*/
@Bean
public ImplWebSocketHandler getImplWebSocketHandler() {
return new ImplWebSocketHandler();
}