Java高开发Java 杂谈Java

spring+stomp+webSocket+SockJS 实现

2019-05-21  本文已影响2人  java高并发

接触webSocket是因为做的项目里有一个图像报表类的监控页面,数据在页面加载后通过ajax后台获取后展示在页面,因为监控数据实时在变化,需要体现数据的时效性,所以采用webSocket实现客户端和服务端的长连接通信,避免客户端重复发起请求获取数据。实现思路是通过定时任务定期的获取最新数据,然后通过webSocket由服务端广播出去,客户端监听这个广播消息并把最新数据展示出来。不多废话。

  1. 首先这是一个ssm项目。加入spring的webSocket包,必须是4.0以上的包,这里用了4.1.1:
    spring-messaging-4.1.1.RELEASE.jar
    spring-websocket-4.1.1.RELEASE.jar
  2. web.xml 里的filter 和servlet 都需要加上<async-supported>true</async-supported>配置。
  3. 配置webSocket配置文件,此配置文件在springmvc的配置文件里引入。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/mvc
    http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
    http://www.springframework.org/schema/aop 
    http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.springframework.org/schema/tx 
    http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/websocket  
    http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
    <bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean">
        <property name="maxTextMessageBufferSize" value="8192"/>
        <property name="maxBinaryMessageBufferSize" value="8192"/>
        <property name="maxSessionIdleTimeout" value="900000"/>
        <property name="asyncSendTimeout" value="5000"/>
    </bean>

    <!-- 实时消息推送,在spring容器初始化此bean的时候就启动了监听线程, -->
    <bean id="webSocketMessageUtil" class="com.lancy.webSocket.common.WebSocketMessageUtil"/>
</beans>

用注解的方式定义的webSocket的配置类

package com.lancy.webSocket.action;

import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import com.lancy.webSocket.handler.StompMessageHandshakeHandler;
import com.lancy.webSocket.handler.WebSocketHandshakeInterceptor;


@EnableWebSocketMessageBroker
@Controller
public class WebSocketMessageAction extends AbstractWebSocketMessageBrokerConfigurer{

    /**
     * 将"/webSocket"路径注册为STOMP端点,这个路径与发送和接收消息的目的路径有所不同,
     * 这是一个端点,客户端在订阅或发布消息到目的地址前,要连接该端点, 
     * 即用户发送请求url="/applicationName/webSocket"与STOMP server进行连接。之后再转发到订阅url;
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册websocket,客户端用ws://host:port/项目名/webSocket 访问
        registry.addEndpoint("/webSocket")
                .setHandshakeHandler(new StompMessageHandshakeHandler())
                .addInterceptors(new WebSocketHandshakeInterceptor())
                .withSockJS();//表示支持以SockJS方式连接服务器  
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic","/user");//这句话表示在topic和user这两个域上服务端可以向客户端发消息
        registry.setApplicationDestinationPrefixes("/ws");//这句话表示客户端向服务器端发送时的主题上面需要加"/ws"作为前缀
        registry.setUserDestinationPrefix("/user");//这句话表示服务端给客户端指定用户发送一对一的主题,前缀是"/user"
    }
}

其中StompMessageHandshakeHandler类代码如下

package com.lancy.webSocket.handler;

import java.security.Principal;
import java.util.Map;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

/**
 * 获取客户端连接前对客户端的session、cookie等信息进行握手处理, 也就是可以在这里可以进行一些用户认证?
 * 这是我个人的理解。这里没有做任何处理
 *
 */
public class StompMessageHandshakeHandler extends DefaultHandshakeHandler{

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
            Map<String, Object> attributes) {
        return super.determineUser(request, wsHandler, attributes);
    }
}

WebSocketHandshakeInterceptor类:

package com.lancy.webSocket.handler;

import java.util.Map;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

/**
 * 握手前后的拦截,这里没有处理,默认
 *
 */
public class WebSocketHandshakeInterceptor extends HttpSessionHandshakeInterceptor{

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Exception ex) {
        super.afterHandshake(request, response, wsHandler, ex);
    }

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Map<String, Object> attributes) throws Exception {
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
}

WebSocketHandshakeHandler类:

package com.lancy.webSocket.handler;

import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

/**
 * WebSocket 握手信息
 * @ClassName: WebSocketHandshakeHandler.java
 * @Description: WebSocket 握手信息
 */
public class WebSocketHandshakeHandler extends TextWebSocketHandler {  

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        super.afterConnectionClosed(session, status);
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        super.handleMessage(session, message);
    }
}  

自定义一个消息实体类,方便封装消息

package com.lancy.webSocket.common;

/**
 * 自定义封装包含发送信息的实体类
 *
 */
public class WebSocketMessage {

    /**
     * 发送广播消息,发送地址:/topic/* ,*为任意名字,如取名monitor,则客户端对应订阅地址为:/topic/monitor
     * 发送私人消息,发送地址:/*,*为任意名字,这里取名为message,客户端对应订阅地址:/user/{自定义客户端标识ID}/message
     */
    //可以自定义其他的属性
    private String distination;
    private Object data;//实际发送的数据对象
    private String userId;//如果不为空,则表示发送给个人而不是广播

    public String getDistination() {
        return distination;
    }
    public void setDistination(String distination) {
        this.distination = distination;
    }
    public Object getData() {
        return data;
    }
    public void setData(Object data) {
        this.data = data;
    }
    public String getUserId() {
        return userId;
    }
    public void setUserId(String userId) {
        this.userId = userId;
    }
}

webSocket消息发送的工具类

package com.lancy.webSocket.common;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import com.lancy.myBatis.common.StringUtils;

/**
 * 这里启动一个线程不停的监控队列是否有消息需要进行发送,如果有就发送出去。
 *
 */
public class WebSocketMessageUtil implements Runnable{

    static Logger log = Logger.getLogger(WebSocketMessageUtil.class); 

    private static SimpMessagingTemplate messageingTemplate;
    private static BlockingQueue<WebSocketMessage> wsQueue = new LinkedBlockingDeque<>();

    public WebSocketMessageUtil() {
        new Thread(this).start();
    }

    @Autowired
    public void setTemplate(SimpMessagingTemplate t){
        WebSocketMessageUtil.messageingTemplate = t;
    }

    public static void addMessage(WebSocketMessage msg){
        try{
            wsQueue.put(msg);
        }catch(InterruptedException e){
            log.error("添加实时消息异常");
        }
    }

    public static void sendMessage(WebSocketMessage msg){
        if(StringUtils.isBlank(msg.getUserId())){
            messageingTemplate.convertAndSend(msg.getDistination(),msg);
        }else{
            messageingTemplate.convertAndSendToUser(msg.getUserId(), msg.getDistination(), msg);
        }
    }

    @Override
    public void run() {
        log.info(">>>>>>>推送消息线程启动,正在监听消息。");
        while (true) {
            try {
                WebSocketMessage msg = wsQueue.take();
                if(msg!=null){
                    WebSocketMessageUtil.sendMessage(msg);
                }
            } catch (Exception ex) {}
        }
    }
}

然后配置spring定时任务,在定时执行的方法里进行消息的添加,参考代码

public void testWebSocket() {
        WebSocketMessage msg = new WebSocketMessage();
        msg.setDistination("/topic/lancy/testWebSocket/new");
        msg.setData("test....");
        WebSocketMessageUtil.addMessage(msg);
}

前台界面
首先引入两个js文件 sockjs.min.js 和 stomp.min.js

<script type="text/javascript" src="${ctx}/js/socket/sockjs.min.js"></script>
<script type="text/javascript" src="${ctx}/js/socket/stomp.min.js"></script>

具体页面如下

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<c:set var="ctx" value="${pageContext.request.contextPath}" />
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Insert title here</title>

<link rel="stylesheet" type="text/css" href="${ctx}/css/default.css" />
<link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/themes/default/easyui.css">
<link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/themes/icon.css">
<link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/demo/demo.css">
<script type="text/javascript" src="${ctx}/js/jquery.min.js"></script>
<script type="text/javascript" src="${ctx}/js/socket/sockjs.min.js"></script>
<script type="text/javascript" src="${ctx}/js/socket/stomp.min.js"></script>
<script type="text/javascript" src="${ctx}/js/app_common.js"></script>
<script type="text/javascript" src="${ctx}/js/easyui/jquery.easyui.min.js"></script>
<script type="text/javascript"> 

    var stompClient = null;  

    function setConnected(connected) {  
        document.getElementById('connect').disabled = connected;  
        document.getElementById('disconnect').disabled = !connected;  
        document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';  
        document.getElementById('response').innerHTML = '';  
    }  

    function connect() {  
        var socket = new SockJS('/myBookstore/webSocket');//连接服务端的端点,连接以后才可以订阅广播消息和个人消息 
        stompClient = Stomp.over(socket);              
        stompClient.connect({}, function(frame) {  
            setConnected(true);  
            console.log('Connected: ' + frame); 
            //订阅广播消息
            stompClient.subscribe('/topic/lancy/testWebSocket/new', function(greeting){  
                showGreeting(greeting.body);  
            });

            //订阅个人信息
            stompClient.subscribe('/user/1/testUser', function(greeting){  
                showGreeting(greeting.body);  
            }); 
        });  
    }  

    function disconnect() {  
        if (stompClient != null) {  
            stompClient.disconnect();  
            setConnected(false);  
            console.log("Disconnected");  
        }  
    }  

    //发送到服务的消息
    function sendName() {  
        var name = document.getElementById('name').value;
        stompClient.send("/ws/webSocket/testWithServer", {'name': 'xiao','syn':'wang'}, JSON.stringify({'message': name }));  
    }  

    function showGreeting(message) {  
        var response = document.getElementById('response');  
        var p = document.createElement('p');  
        p.style.wordWrap = 'break-word';  
        p.appendChild(document.createTextNode(message));  
        response.appendChild(p);  
    }

</script>
</head>
<body class="easyui-layout">
    <div>  
        <div>  
            <button id="connect" onclick="connect();">Connect</button>  
            <button id="disconnect" disabled="disabled" onclick="disconnect();">Disconnect</button>  
        </div>  
        <div id="conversationDiv">  
            <label>What is your name?</label><input type="text" id="name" />  
            <button id="sendName" onclick="sendName();">Send all</button>  
            <p id="response"></p>  
        </div>  
    </div> 


</body>
</html>

js方法里的sendName 发送到服务端的消息对应的类:

package com.lancy.webSocket.action;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

@Controller
public class WebSocketTestAction {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/webSocket/testWithServer")
    //与页面的stompClient.send("/ws/webSocket/testWithServer", 
    //{'name': 'xiao','syn':'wang'}, JSON.stringify({'message': name }));方法对应
    public String send(String message,@Header("name")String name,
            @Headers Map<String, Object> headers){
        System.out.println(message);
        System.out.println(name);
        System.out.println(headers);
        simpMessagingTemplate.convertAndSend("/user/1/testUser","服务端返回消息");
        return "";
    }
}

欢迎工作一到五年的Java工程师朋友们加入Java高并发QQ群:219571750,群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

41bcdf6703584fc58d197b1b04af4990.jpg 616236adc0134e2daf4b2d4c60c30360.jpg
上一篇下一篇

猜你喜欢

热点阅读