04.使用SpringBoot构建一个简单的WebSocket通
2019-06-01 本文已影响365人
哈哈大圣
使用SpringBoot构建一个简单的WebSocket通信
一、需求分析
- 浏览器终端首次打开需要输入唯一的用户名作为标识
- 浏览器终端可以查看当前在线的用户
- 浏览器终端用户可以选择用户进行沟通
- 浏览器终端用户可以给所有用户群发消息
后台使用springBoot,前端使用vue.js;相关js可以引用相关的cdn
二、项目结构一览
webSocket-01.png三、项目代码
1). maven依赖 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>b-springboot-websocket</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.30</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
2). Java代码
- 启动类Application: 程序的入口,同时配置此环境下webSocketServer需要的bean
package com.lingting.websocketlearn;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 启动类
* @author Liucheng
* @date 2019/5/1 21:53
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
/**
* WebSocket服务端配置
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
}
- WebSocketServer: 提供webSocket连接处理服务,多例模式!
package com.lingting.websocketlearn.websocketserver;
import com.lingting.websocketlearn.framework.decoder.UserMessageDecoder;
import com.lingting.websocketlearn.framework.encoder.UserMessageEncoder;
import com.lingting.websocketlearn.dto.UserMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* WebSocket 服务端; 多例模式
* 这里的WebSocketServer相当于ws协议
* @author Liucheng
* @date 2019/5/1 22:02
*/
@ServerEndpoint(
// 握手的url
value = "/websocket/{username}",
// 消息编码器
encoders = {UserMessageEncoder.class},
// 消息解码器
decoders = {UserMessageDecoder.class}
)
@Component
public class WebSocketServer {
/**
* 日志信息
*/
static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 静态变量,记录当前在线连接数。设计为线程安全【CAS乐观锁机制】,分布式环境下,建议使用Redis
*/
private static AtomicInteger onlineCount = new AtomicInteger();
/**
* concurrent包下的线程安全Set,用来存放每个客户端对应的MyWebSocket对象【之前的案例】
* private static CopyOnWriteArraySet<WebSocketServer> webSocketServers = new CopyOnWriteArraySet<>();
*
* 使用线程安全Map存储客户端
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketServerMap = new ConcurrentHashMap<>();
/**
* 存放用于姓名列表
*/
private static List<String> userList = new ArrayList<>();
/**
* 与某个客户端的连接会话,需要通过此为客户端发送数据
*/
private Session session;
/**
* 当前连接的客户端 用户名
*/
private String username;
/**
* 连接建立成功调用的方法
* @param session
* @param username
*/
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
// 用户名为空,或者用户名不合法
if (username == null || "".equals(username.trim()) || webSocketServerMap.get(username) != null) {
try {
UserMessage userMessage = new UserMessage();
userMessage.setCode(400);
session.getBasicRemote().sendObject(userMessage);
session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT,"username is illegality"));
return;
} catch (Exception e) {
e.printStackTrace();
return;
}
}
this.username = username;
this.session = session;
// 加入Map中
webSocketServerMap.put(this.username, this);
userList.add(this.username);
// 在线数+1
onlineCount.addAndGet(1);
// 测试:获取路径中的参数
Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();
requestParameterMap.entrySet().forEach(
entry -> System.out.println(entry.getKey() + ": " + entry.getValue())
);
LOGGER.info("有新窗口开始监听:" + this.username + ",当前在线人数为:" + onlineCount.longValue());
}
/**
* 关闭连接调用的方法
*/
@OnClose
public void onClose(Session session, CloseReason reason) {
try {
System.out.println("关闭原因:" + reason.getReasonPhrase());
session.close();
} catch (IOException e) {
e.printStackTrace();
}
// 从map中删除, 如果username为null,那么之前onOpen没有将其添加到map中
if (this.username != null) {
webSocketServerMap.remove(this.username);
userList.remove(this.username);
// 在线数减1
onlineCount.decrementAndGet();
}
LOGGER.info("有一连接关闭!当前在线人数为:" + onlineCount.longValue());
}
/**
* webSocket发生错误时触发
* @param error
*/
@OnError
public void onError(Throwable error) {
LOGGER.error("webSocket 发生错误");
error.printStackTrace();
}
/**
* 收到客户端消息后调用的方法(@Validated为spring提供的bean验证功能)
* @param userMessage
* @param session
*/
@OnMessage
public void onMessage(@Validated UserMessage userMessage, Session session) throws InterruptedException, IOException {
userMessage.setFromUser(this.username);
if (userMessage.getMassTexting()) {
// 群发消息
sendMessageGroup(userMessage);
} else {
// 单独给个人发送消息
WebSocketServer webSocketServer = webSocketServerMap.get(userMessage.getToUser());
if (webSocketServer != null) {
try {
webSocketServer.session.getBasicRemote().sendObject(userMessage);
} catch (EncodeException e) {
e.printStackTrace();
}
}
}
}
/**
* 向所有客户端群发消息
*/
public static void sendMessageGroup(UserMessage userMessage) throws IOException {
for (WebSocketServer item : webSocketServerMap.values()) {
try {
item.session.getBasicRemote().sendObject(userMessage);
} catch (EncodeException e) {
e.printStackTrace();
}
}
}
/**
* 手动关闭webSocket连接
* @throws IOException
*/
public static void closeAll() throws IOException {
LOGGER.info("关闭服务端所有连接");
for (WebSocketServer item : webSocketServerMap.values()) {
item.session.close();
}
webSocketServerMap.clear();
userList.clear();
}
/**
* 获取名户名列表
* @return
*/
public static List<String> getUserList () {
return userList;
}
}
- UserMessageDecoder: 自定义接收消息的解码器
package com.lingting.websocketlearn.framework.decoder;
import com.alibaba.fastjson.JSON;
import com.lingting.websocketlearn.dto.UserMessage;
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;
/**
* @author Liucheng
* @date 2019/6/1 16:47
*/
public class UserMessageDecoder implements Decoder.Text<UserMessage> {
/**
* 解码函数
* @param s
* @return
* @throws DecodeException
*/
@Override
public UserMessage decode(String s) throws DecodeException {
return JSON.parseObject(s, UserMessage.class);
}
@Override
public boolean willDecode(String s) {
// 这个方法会在解码之前执行,可以再次判断字符串格式是否合法等,这里就不演示了
return true;
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
- UserMessageEncoder: 自定义发送消息的编码器
package com.lingting.websocketlearn.framework.encoder;
import com.alibaba.fastjson.JSON;
import com.lingting.websocketlearn.dto.UserMessage;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
/**
* @author Liucheng
* @date 2019/6/1 16:47
*/
public class UserMessageEncoder implements Encoder.Text<UserMessage> {
/**
* 编码函数
* @param object
* @return
* @throws EncodeException
*/
@Override
public String encode(UserMessage object) throws EncodeException {
return JSON.toJSONString(object);
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
- UserMessage : 消息传递 dto (需要自行配置lombok插件)
package com.lingting.websocketlearn.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import javax.validation.constraints.NotNull;
/**
* @author Liucheng
* @date 2019/6/1 13:18
*/
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class UserMessage {
/**
* 自定义状态码
* 200 成功: 默认
* 400 用户名已经被占用
*/
private Integer code = 200;
/**
* 消息来源
*/
private String fromUser;
/**
* 消息目的用户
* 使用了bean验证技术
*/
@NotNull
private String toUser;
/**
* 消息内容
*/
@NotNull
private String messageContent;
/**
* 是否群发
*/
@NotNull
private Boolean massTexting;
}
- 控制器:用于获取用户列表以及关闭所有连接
package com.lingting.websocketlearn.controller;
import com.lingting.websocketlearn.websocketserver.WebSocketServer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author Liucheng
* @date 2019/5/2 10:59
*/
@RestController
@RequestMapping("/controller")
public class NotifyController {
/**
* 获取所有的用户列表
* @return
*/
@GetMapping("/getalluser")
public List<String> getAllUser () {
return WebSocketServer.getUserList();
}
/**
* 关闭所有webSocket连接
* @return
* @throws IOException
*/
@GetMapping("/closeall")
public Map closeAll () throws IOException {
// 调用静态方法,关闭所有连接
WebSocketServer.closeAll();
Map<String, Boolean> result = new HashMap<>();
result.put("success", true);
return result;
}
}
3). 前端
-
js依赖: 见index.xml描述
-
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>webSocket</title>
<style>
*{margin: 0; padding: 0;}
</style>
<script src="js/vue.min.js"></script>
<script src="js/axios.min.js"></script>
<!-- vue/axios的cnd如下 -->
<!--<script src="https://cdn.jsdelivr.net/npm/vue@2.5.16/dist/vue.js"></script>-->
<!--<script src="https://cdn.bootcss.com/axios/0.19.0-beta.1/axios.min.js"></script>-->
</head>
<body>
<div id="app">
<button @click="getUserList()">获取用户列表</button>
<ul>
<li v-for="other in otherUsers">{{ other }}</li>
</ul>
<form action="">
<label for="toUser">输入需要发送的用户</label>
<input type="text" v-model="messageSend.toUser" id="toUser"> <br>
<label>是否群发</label>
<input type="radio" value="true" v-model="messageSend.massTexting">是
<input type="radio" value="false" v-model="messageSend.massTexting" checked>否 <br>
<label for="messageContent">输入需要发送的信息</label>
<textarea v-model="messageSend.messageContent" id="messageContent"></textarea>
</form>
<br>
<button @click="sendMessage()">发送</button>
<br>
<form action="">
<label for="messageReceivedContent">接收到的消息</label>
<textarea v-model="messageReceived.messageContent" id="messageReceivedContent"></textarea>
</form>
<br>
<button @click="close()">关闭连接</button>
</div>
</body>
<script type="text/javascript">
var VM = new Vue({
el: "#app",
data: {
webSocketServer: null, // webSocket对象
username: null, // 本终端用户名
messageSend: { // 发送的消息对象
code: 200,
fromUser: null,
toUser: null,
messageContent: null,
massTexting: false
},
messageReceived: { // 接收的消息对象
code: 200,
fromUser: null,
toUser: null,
messageContent: null,
massTexting: false
},
otherUsers: [],
},
created() {
this.inputUsername ();
},
methods: {
inputUsername () {
while (this.username == null || this.username.trim() === "") {
this.username = prompt("请输入用户名!")
}
// 建立连接
this.connection();
},
connection () {
if ("WebSocket" in window) {
console.log("您的浏览器支持 WebSocket!");
// 打开一个 web socket【参数可选,这里只是为了测试】
this.webSocketServer = new WebSocket("ws://localhost:/websocket/" + this.username + "?param=var");
this.webSocketServer.onopen = this.onOpen;
this.webSocketServer.onmessage = this.onMessage;
this.webSocketServer.onclose = this.onClose;
} else {
// 浏览器不支持 WebSocket
console.log("您的浏览器不支持 WebSocket!");
}
},
onOpen(evt) {
alert("已经连上了服务器")
},
onMessage(evt) {
// 此处获得的为字符串
this.messageReceived = JSON.parse(evt.data);
if (this.messageReceived.code !== 200) {
console.log(this.messageReceived.code)
this.username = null;
this.inputUsername();
}
console.log(this.messageReceived)
},
onClose() {
alert("客户端关闭了连接")
},
sendMessage() {
// 发送消息
console.log("messageSend: " + this.messageSend)
console.log("messageSendStr: " + JSON.stringify(this.messageSend))
this.webSocketServer.send(JSON.stringify(this.messageSend))
},
close() {
// 关闭连接
this.webSocketServer.close();
},
getUserList () {
// 获取用户列表
let _this = this;
axios.get("/controller/getalluser").then(function (response) {
// 此处自动转换为一个json对象
_this.otherUsers = response.data;
console.log(_this.otherUsers)
}).catch(function (error) {
alert(error)
})
}
}
})
</script>
</html>
4). 配置文件
- application.yml
server:
port: 80
- logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--定义日志文件的存储地址,使用绝对路径-->
<property name="LOG_HOME" value="d:/logs"/>
<!-- Console 输出设置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 按照每天生成日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--日志文件输出的文件名-->
<fileNamePattern>${LOG_HOME}/xc.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 异步输出 -->
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>512</queueSize>
<!-- 添加附加的appender,最多只能添加一个 -->
<appender-ref ref="FILE"/>
</appender>
<logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>
<logger name="org.springframework.boot" level="WARNING"/>
<root level="info">
<!--<appender-ref ref="ASYNC"/>-->
<appender-ref ref="FILE"/>
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
四、测试说明
运行Application, 打开多个本机浏览器页面,在页面地址栏输入
localhost
,按照提示操作即可