Java WebSocket 基于spring + sockjs

2018-03-24  本文已影响0人  酷酷的小k

简述:支持发送公告、发送指定用户及异地登录。

<servlet>,<filter>标签需要加入<async-supported>true</async-supported>
该标签支持异步处理,例如WebSocket。

定义收发消息实体类:

package com.test.springWebsocket;

public class WebMessage {

    /**
     * 用户id
     */
    private Long userId;

    /**
     * 用户名
     */
    private String username;

    /**
     * 客户端标记
     */
    private String clientMark;

    /**
     * 内容
     */
    private String contents;

    /**
     * 消息类型,1.公告,2.点对点发消息,3.检查异地登录
     */
    private String type;

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getClientMark() {
        return clientMark;
    }

    public void setClientMark(String clientMark) {
        this.clientMark = clientMark;
    }

    public String getContents() {
        return contents;
    }

    public void setContents(String contents) {
        this.contents = contents;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }
}

定义WebSocket用户实体类:

package com.test.springWebsocketTwo;

public class WsUser {

    /**
     * 用户id
     */
    private Long userId;

    /**
     * 用户名
     */
    private String username;

    /**
     * 客户端标记
     */
    private String clientMark;

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getClientMark() {
        return clientMark;
    }

    public void setClientMark(String clientMark) {
        this.clientMark = clientMark;
    }
}

定义握手拦截器类:

package com.test.springWebsocketTwo;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.Map;

/**
 * 拦截用户登录信息,并将用户登录信息交给WebSocket的WebSocketSession来管理
 */
public class ImplHandshakeInterceptor implements HandshakeInterceptor {

    /**
     * 握手之前
     *
     * @param request    ServerHttpRequest
     * @param response   ServerHttpResponse
     * @param wsHandler  WebSocketHandler
     * @param attributes Map<String, Object>
     * @return boolean
     * @throws Exception Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest req = (ServletServerHttpRequest) request;
            Long userId = Long.parseLong(req.getServletRequest().getParameter("userId"));
            String username = req.getServletRequest().getParameter("username");
            String clientMark = req.getServletRequest().getParameter("clientMark");
            WsUser wsUser = new WsUser();
            wsUser.setUserId(userId);
            wsUser.setUsername(username);
            wsUser.setClientMark(clientMark);
            attributes.put("wsUser", wsUser);
        }
        return true;
    }

    /**
     * 握手之后
     *
     * @param request   ServerHttpRequest
     * @param response  ServerHttpResponse
     * @param wsHandler WebSocketHandler
     * @param exception Exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
    }
}

定义WebSocket处理类:

package com.test.springWebsocketTwo;

import com.google.gson.Gson;
import com.test.springWebsocket.WebMessage;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * 消息处理类
 */
public class ImplWebSocketHandler implements WebSocketHandler {

    private static List<WebSocketSession> sessionList = new ArrayList<>();

    private synchronized void addWebSocketSession(WebSocketSession session) {
        sessionList.add(session);
    }

    private synchronized void removeWebSocketSession(WebSocketSession session) {
        sessionList.remove(session);
    }

    /**
     * 给所有的用户发送消息
     *
     * @param webMsg WebMessage
     */
    public void sendToAllMsg(WebMessage webMsg) {
        String jsonStr = new Gson().toJson(webMsg);
        TextMessage textMsg = new TextMessage(jsonStr);
        for (WebSocketSession session : sessionList) {
            WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
            try {
                //isOpen()在线就发送
                if (session.isOpen()) {
                    session.sendMessage(textMsg);
                }
            } catch (IOException e) {
                System.out.println(String.format("用户<%s>发送消息失败|消息体<%s>", wsUser.getUsername(), jsonStr));
                e.printStackTrace();
            }
        }
    }

    /**
     * 发送消息给指定的用户
     *
     * @param webMsg WebMessage
     */
    public void sendToUserMsg(WebMessage webMsg) {
        String jsonStr = new Gson().toJson(webMsg);
        TextMessage textMsg = new TextMessage(jsonStr);
        for (WebSocketSession session : sessionList) {
            WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
            if (wsUser.getUserId().equals(webMsg.getUserId())) {
                try {
                    //isOpen()在线就发送
                    if (session.isOpen()) {
                        session.sendMessage(textMsg);
                    }
                } catch (IOException e) {
                    System.out.println(String.format("用户<%s>发送消息失败|消息体<%s>", wsUser.getUsername(), jsonStr));
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 连接建立之后
     *
     * @param session WebSocketSession
     * @throws Exception Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
        WebMessage webMsg = new WebMessage();
        webMsg.setUserId(wsUser.getUserId());
        webMsg.setUsername(wsUser.getUsername());
        webMsg.setClientMark(wsUser.getClientMark());
        webMsg.setType("3");
        sendToUserMsg(webMsg);
        addWebSocketSession(session);
        System.out.println(String.format("用户<%s>打开WebSocket连接...", wsUser.getUsername()));
    }

    /**
     * 处理消息
     *
     * @param session      WebSocketSession
     * @param webSocketMsg WebSocketMessage<?>
     * @throws Exception Exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> webSocketMsg) throws Exception {
        String jsonStr = (String) webSocketMsg.getPayload();
        WebMessage webMsg = new Gson().fromJson(jsonStr, WebMessage.class);
        if (webMsg.getUserId() == null) {
            sendToAllMsg(webMsg);
        } else {
            sendToUserMsg(webMsg);
        }
    }

    /**
     * 处理传输错误
     *
     * @param session WebSocketSession
     * @param error   Throwable
     * @throws Exception Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable error) throws Exception {
        error.printStackTrace();
    }

    /**
     * 连接关闭之后
     *
     * @param session     WebSocketSession
     * @param closeStatus CloseStatus
     * @throws Exception Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        WsUser wsUser = (WsUser) session.getAttributes().get("wsUser");
        if (session.isOpen()) {
            session.close();
        }
        removeWebSocketSession(session);
        System.out.println(String.format("用户<%s>关闭WebSocket连接...", wsUser.getUsername()));
    }

    /**
     * WebSocketHandler是否处理部分消息.
     * 如果此标志设置为 true并且基础WebSocket服务器支持部分消息,
     * 则可能会拆分大型WebSocket消息或其中一个未知大小的消息,
     * 并可能通过多次调用接收消息 handleMessage(WebSocketSession, WebSocketMessage).
     * 该标志 WebSocketMessage.isLast()指示该消息是否部分以及它是否是最后一部分.
     *
     * @return boolean
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}

定义WebSocket配置:

package com.test.springWebsocketTwo;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * @Bean 相当于spring xml里面的bean标签
 * @Configuration 等价于xml中配置beans标签
 * @EnableWebSocket 对WebSocket的支持
 */
@Configuration
@EnableWebSocket
public class ImplWebSocketConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        //WebSocket的支持
        registry.addHandler(WebSocketPushHandler(), "/web-socket-server").addInterceptors(new ImplHandshakeInterceptor());
        //sock js的支持
        registry.addHandler(WebSocketPushHandler(), "/sock-js/web-socket-server").addInterceptors(new ImplHandshakeInterceptor()).withSockJS();
    }

    @Bean
    public WebSocketHandler WebSocketPushHandler() {
        return new ImplWebSocketHandler();
    }
}

定义页面RequestMapping:

/**
     * spring WebSocket two 页面
     *
     * @param request HttpServletRequest
     * @return String
     */
    @RequestMapping("/spring-websocket-two.xhtm")
    public String springWebsocketTwo(HttpServletRequest request) {
        String clientMark = (String) request.getSession().getAttribute("clientMark");
        if (clientMark == null) {
            clientMark = GenerateUtil.getUUID();
            request.getSession().setAttribute("clientMark", clientMark);
        }
        Admin admin = (Admin) request.getSession().getAttribute("admin");
        request.setAttribute("userId", admin.getId());
        request.setAttribute("username", admin.getAdmin());
        request.setAttribute("clientMark", clientMark);
        return "springWebsocketTwo/springWebsocketTwo";
    }

定义jsp页面:

<!DOCTYPE html>

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>

<%
    String rootPath = request.getContextPath();
    String wsBasePath = "ws://" + request.getServerName() + ":" + request.getServerPort() + rootPath + "/";
    String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + rootPath + "/";
%>

<html>
    <head>
        <title>Spring WebSocket Two</title>
        <meta http-equiv="Expires" content="0"/>
        <meta http-equiv="Cache" content="no-cache"/>
        <meta http-equiv="Pragma" content="no-cache"/>
        <meta http-equiv="Cache-Control" content="no-cache"/>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
        <script type="text/javascript" src="../../../static/js/plugins/jquery/jquery-1.9.1.min.js"></script>
        <script type="text/javascript" src="../../../static/js/common/sockjs.v1.0.0.min.js"></script>
        <script type="text/javascript">
            var userId = "${userId}";
            var username = "${username}";
            var clientMark = "${clientMark}";
            var wsBasePath = "<%=wsBasePath%>";
            var basePath = "<%=basePath%>";
        </script>
        <style>
            .c_box {
                width: 800px;
                height: 300px;
                border: 1px solid;
                overflow: auto;
            }
        </style>
    </head>
    <body>
        <div>用户id:${userId}</div>
        <div>用户名:${username}</div>
        <div>
            <span>公告:</span>
            <input type="text" id="notice"/>
            <input type="button" id="sendNotice" value="发送"/>
        </div>
        <div>
            <span>用户id:</span>
            <input type="text" id="toUserId"/>
            <span>消息:</span>
            <input type="text" id="toUserMsg"/>
            <input type="button" id="sendToUser" value="发送"/>
        </div>
        <div>公告列表:</div>
        <div id="noticeList" class="c_box"></div>
        <div>消息列表:</div>
        <div id="msgList" class="c_box"></div>
        <script type="text/javascript" src="../../../static/js/springWebsocketTwo/springWebsocketTwo.js"></script>
    </body>
</html>

定义JavaScript脚本:

var ws = null;
var wsParams = "?userId=" + userId + "&username=" + username + "&clientMark=" + clientMark;

if ('WebSocket' in window) {
    ws = new WebSocket(wsBasePath + "web-socket-server" + wsParams);
} else if ('MozWebSocket' in window) {
    ws = new MozWebSocket(wsBasePath + "web-socket-server" + wsParams);
} else {
    //兼容低版本浏览器
    ws = new SockJS(basePath + "sock-js/web-socket-server" + wsParams);
}
ws.onopen = function (event) {
    console.log("WebSocket连接成功");
};
ws.onmessage = function (event) {
    showMsg(JSON.parse(event.data));
};
ws.onerror = function (event) {
    console.log("WebSocket连接错误");
};
ws.onclose = function (event) {
    console.log("WebSocket连接关闭");
};

function closeSocket() {
    if (ws == null || ws.readyState == 2 || ws.readyState == 3) {
        return true;
    }
    ws.close();
}

//监听窗口关闭事件,当窗口关闭时,主动去关闭WebSocket连接,防止连接还没断开就关闭窗口,server端会抛异常.
window.onbeforeunload = function () {
    closeSocket();
};

function showMsg(webMsg) {
    switch (webMsg['type']) {
        //公告
        case '1': {
            var noticeHtm = '<div>' + webMsg['contents'] + '</div>';
            $('#noticeList').append(noticeHtm);
            $("#noticeList").scrollTop($("#noticeList")[0].scrollHeight);
            break;
        }
        //点对点发消息
        case '2': {
            var msgHtm = '<div>' + webMsg['contents'] + '</div>';
            $('#msgList').append(msgHtm);
            $("#msgList").scrollTop($("#msgList")[0].scrollHeight);
            break;
        }
        //检查异地登录
        case '3': {
            if (webMsg['clientMark'] != clientMark) {
                closeSocket();
                alert('您的账号在另一处登录');
            }
            break;
        }
        default: {
            alert("WebSocket接收到未知消息...");
            break;
        }
    }
}

$("#sendNotice").on("click", function () {
    if (ws == null) {
        alert('WebSocket连接未打开');
        return true;
    }
    if (ws.readyState == 0) {
        alert('WebSocket正在连接中,请稍后再发送消息');
        return true;
    }
    if (ws.readyState == 2) {
        alert('WebSocket连接正在关闭中,无法发送消息');
        return true;
    }
    if (ws.readyState == 3) {
        alert('WebSocket连接已关闭,无法发送消息');
        return true;
    }
    var notice = $("#notice").val();
    if (notice.length > 0) {
        var webMsg = {
            'contents': notice,
            'type': '1'
        };
        ws.send(JSON.stringify(webMsg));
    }
});

$("#sendToUser").on("click", function () {
    if (ws == null) {
        alert('WebSocket连接未打开');
        return true;
    }
    if (ws.readyState == 0) {
        alert('WebSocket正在连接中,请稍后再发送消息');
        return true;
    }
    if (ws.readyState == 2) {
        alert('WebSocket连接正在关闭中,无法发送消息');
        return true;
    }
    if (ws.readyState == 3) {
        alert('WebSocket连接已关闭,无法发送消息');
        return true;
    }
    var toUserId = $("#toUserId").val();
    var toUserMsg = $("#toUserMsg").val();
    if (toUserId.length > 0 && toUserMsg.length > 0) {
        var webMsg = {
            'userId': toUserId,
            'username': username,
            'contents': toUserMsg,
            'type': '2'
        };
        ws.send(JSON.stringify(webMsg));
    }
});

如果想要在service发送消息,如下:

   /**
     * 发送消息
     */
    public void sendMsg() {
        WebMessage toAllMsg = new WebMessage();
        toAllMsg.setType("1");
        toAllMsg.setContents("发送公告...");
        getImplWebSocketHandler().sendToAllMsg(toAllMsg);

        WebMessage toUserMsg = new WebMessage();
        toUserMsg.setType("2");
        toUserMsg.setUserId(2L);
        toUserMsg.setUsername("Jane");
        toUserMsg.setContents("发送消息给指定用户...");
        getImplWebSocketHandler().sendToUserMsg(toUserMsg);
    }

    /**
     * 注入消息处理类
     *
     * @return ImplWebSocketHandler
     */
    @Bean
    public ImplWebSocketHandler getImplWebSocketHandler() {
        return new ImplWebSocketHandler();
    }
上一篇下一篇

猜你喜欢

热点阅读