SpringBoot WebSocket 服务器主动推送(二)
2017-09-16 本文已影响1587人
右耳朵大鼻子
本文讲解通过WebSocket的高级封装Stomp协议完成消息的互动
上篇文章SpringBoot WebSocket 服务器主动推送(一)是基于纯WebSocket
的服务器端推送,它只完成了客户端建立连接->阻塞等待服务器消息的功能。对于客户端推送消息、订阅消息,服务器推送消息的内容在本文概述。话不多说,上代码。
服务端
1.Maven 项目在pom.xml 里引入websocket 依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- Boot 启动类
@SpringBootApplication
public class Application implements WebSocketConfigurer {
public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).bannerMode(Banner.Mode.OFF).run(args);
}
//Socket消息模版类,用来向客户端推送消息
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
//用请求的方式模拟主动推送消息
@GetMapping("notice")
public String notice(String name) {
//这里定义了订阅消息的路径是"/queue/notice",客户端请求的路径则为:"/user/queue/notice"
simpMessagingTemplate.convertAndSendToUser(name, "/queue/notice", "当前时间是:" + new Date());
return "已发送";
}
}
3.WebSocket配置类,定义socket连接、推送、订阅路径,以及关联用户鉴权带业务
//开启WebSocket,并启用 STOMP
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketMessageBrokerConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//将“/register”注册为STOMP端点,客户端在订阅或发布消息到目的地路径前,要连接该端点
registry.addEndpoint("/register")
//自定义每个客户端对应的标识,用于服务端精准消息推送
.setHandshakeHandler(new DefaultHandshakeHandler(){
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
//将客户端标识封装为Principal对象,从而让服务端能通过getName()方法找到指定客户端
Object o = attributes.get("name");
return new FastPrincipal(o.toString());
}
})
//添加socket拦截器,用于从请求中获取客户端标识参数
.addInterceptors(NameHandshakeInterceptor()).withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
//客户端发送消息的请求前缀
config.setApplicationDestinationPrefixes("/app");
//客户端订阅消息的请求前缀,topic一般用于广播推送,queue用于点对点推送
config.enableSimpleBroker("/topic", "/queue");
//服务端通知客户端的前缀,可以不设置,默认为user
config.setUserDestinationPrefix("/user");
}
class FastPrincipal implements Principal {
private final String name;
public FastPrincipal(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
}
}
3.Socket拦截器,将客户端请求的参数存储至SocketSession中
/**
* 检查握手请求和响应, 对WebSocketHandler传递属性
*/
public static class CountHandshakeInterceptor implements HandshakeInterceptor {
/**
* 在握手之前执行该方法, 继续握手返回true, 中断握手返回false.
* 通过attributes参数设置WebSocketSession的属性
*
* @param request
* @param response
* @param wsHandler
* @param attributes
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String name= ((ServletServerHttpRequest) request).getServletRequest().getParameter("name");
System.out.println("======================Interceptor" + name);
//保存客户端标识
attributes.put("name", name);
return true;
}
/**
* 在握手之后执行该方法. 无论是否握手成功都指明了响应状态码和相应头.
*
* @param request
* @param response
* @param wsHandler
* @param exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
}
客户端
public class StompSocketTest {
private static Logger logger = LoggerFactory.getLogger(StompSocketTest.class);
private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
@Test
public void testStompSubscribe() throws ExecutionException, InterruptedException {
StompSocketTest helloClient = new StompSocketTest();
ListenableFuture<StompSession> f = helloClient.connect("ws://localhost:8080/register?name=1");
StompSession stompSession = f.get();
logger.info("Subscribing to greeting topic using session " + stompSession);
helloClient.subscribeGreetings("/user/queue/notice", stompSession);
Thread.sleep(600000);
}
public ListenableFuture<StompSession> connect(String url) {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
return stompClient.connect(url, headers, new MyHandler(), "localhost", 8080);
}
public void subscribeGreetings(String url, StompSession stompSession) throws ExecutionException, InterruptedException {
stompSession.subscribe(url, new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
public void handleFrame(StompHeaders stompHeaders, Object o) {
logger.info("Received greeting " + new String((byte[]) o));
}
});
}
private class MyHandler extends StompSessionHandlerAdapter {
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
logger.info("Now connected");
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
super.handleFrame(headers, payload);
logger.debug("=========================handleFrame");
}
}
}
至此便完成了主要代码逻辑。先启动服务端,然后运行客户端建立WebSocket连接,接着在浏览器地址栏输入localhost:8080/notice?name=1
,服务器便会找到对应的socketSession
对其进行推送消息。
参考资料
springmvc(18)使用WebSocket 和 STOMP 实现消息功能
Spring Websocket/STOMP 和SpringSession整合 初步