spring boot

04.使用SpringBoot构建一个简单的WebSocket通

2019-06-01  本文已影响365人  哈哈大圣

使用SpringBoot构建一个简单的WebSocket通信

一、需求分析

  1. 浏览器终端首次打开需要输入唯一的用户名作为标识
  2. 浏览器终端可以查看当前在线的用户
  3. 浏览器终端用户可以选择用户进行沟通
  4. 浏览器终端用户可以给所有用户群发消息

后台使用springBoot,前端使用vue.js;相关js可以引用相关的cdn

二、项目结构一览

webSocket-01.png

三、项目代码

1). maven依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>b-springboot-websocket</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.30</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

</project>

2). Java代码

  1. 启动类Application: 程序的入口,同时配置此环境下webSocketServer需要的bean
package com.lingting.websocketlearn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 启动类
 * @author Liucheng
 * @date 2019/5/1 21:53
 */
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    /**
     * WebSocket服务端配置
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter () {
        return new ServerEndpointExporter();
    }
}
  1. WebSocketServer: 提供webSocket连接处理服务,多例模式!
package com.lingting.websocketlearn.websocketserver;

import com.lingting.websocketlearn.framework.decoder.UserMessageDecoder;
import com.lingting.websocketlearn.framework.encoder.UserMessageEncoder;
import com.lingting.websocketlearn.dto.UserMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSocket 服务端; 多例模式
 * 这里的WebSocketServer相当于ws协议
 * @author Liucheng
 * @date 2019/5/1 22:02
 */
@ServerEndpoint(
        // 握手的url
        value = "/websocket/{username}",
        // 消息编码器
        encoders = {UserMessageEncoder.class},
        // 消息解码器
        decoders = {UserMessageDecoder.class}
)
@Component
public class WebSocketServer {

    /**
     * 日志信息
     */
    static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 静态变量,记录当前在线连接数。设计为线程安全【CAS乐观锁机制】,分布式环境下,建议使用Redis
     */
    private static AtomicInteger onlineCount = new AtomicInteger();

    /**
     * concurrent包下的线程安全Set,用来存放每个客户端对应的MyWebSocket对象【之前的案例】
     * private static CopyOnWriteArraySet<WebSocketServer> webSocketServers = new CopyOnWriteArraySet<>();
     *
     * 使用线程安全Map存储客户端
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketServerMap = new ConcurrentHashMap<>();

    /**
     * 存放用于姓名列表
     */
    private static List<String> userList = new ArrayList<>();

    /**
     * 与某个客户端的连接会话,需要通过此为客户端发送数据
     */
    private Session session;

    /**
     * 当前连接的客户端 用户名
     */
    private String username;

    /**
     * 连接建立成功调用的方法
     * @param session
     * @param username
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username) {

        // 用户名为空,或者用户名不合法
        if (username == null || "".equals(username.trim()) || webSocketServerMap.get(username) != null) {
            try {
                UserMessage userMessage = new UserMessage();
                userMessage.setCode(400);
                session.getBasicRemote().sendObject(userMessage);
                session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT,"username is illegality"));
                return;
            } catch (Exception e) {
                e.printStackTrace();
                return;
            }
        }

        this.username = username;

        this.session = session;

        // 加入Map中
        webSocketServerMap.put(this.username, this);
        userList.add(this.username);

        // 在线数+1
        onlineCount.addAndGet(1);

        // 测试:获取路径中的参数
        Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();
        requestParameterMap.entrySet().forEach(
                entry -> System.out.println(entry.getKey() + ": " + entry.getValue())
        );

        LOGGER.info("有新窗口开始监听:" + this.username + ",当前在线人数为:" + onlineCount.longValue());

    }

    /**
     * 关闭连接调用的方法
     */
    @OnClose
    public void onClose(Session session, CloseReason reason) {

        try {
            System.out.println("关闭原因:" + reason.getReasonPhrase());
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 从map中删除, 如果username为null,那么之前onOpen没有将其添加到map中
        if (this.username != null) {
            webSocketServerMap.remove(this.username);
            userList.remove(this.username);

            // 在线数减1
            onlineCount.decrementAndGet();
        }

        LOGGER.info("有一连接关闭!当前在线人数为:" + onlineCount.longValue());
    }

    /**
     * webSocket发生错误时触发
     * @param error
     */
    @OnError
    public void onError(Throwable error) {
        LOGGER.error("webSocket 发生错误");
        error.printStackTrace();
    }

    /**
     * 收到客户端消息后调用的方法(@Validated为spring提供的bean验证功能)
     * @param userMessage
     * @param session
     */
    @OnMessage
    public void onMessage(@Validated UserMessage userMessage, Session session) throws InterruptedException, IOException {

        userMessage.setFromUser(this.username);
        if (userMessage.getMassTexting()) {
            // 群发消息
            sendMessageGroup(userMessage);
        } else {
            // 单独给个人发送消息
            WebSocketServer webSocketServer = webSocketServerMap.get(userMessage.getToUser());

            if (webSocketServer != null) {
                try {
                    webSocketServer.session.getBasicRemote().sendObject(userMessage);
                } catch (EncodeException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 向所有客户端群发消息
     */
    public static void sendMessageGroup(UserMessage userMessage) throws IOException {
        for (WebSocketServer item : webSocketServerMap.values()) {
            try {
                item.session.getBasicRemote().sendObject(userMessage);
            } catch (EncodeException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 手动关闭webSocket连接
     * @throws IOException
     */
    public static void closeAll() throws IOException {
        LOGGER.info("关闭服务端所有连接");
        for (WebSocketServer item : webSocketServerMap.values()) {
            item.session.close();
        }
        webSocketServerMap.clear();
        userList.clear();
    }

    /**
     * 获取名户名列表
     * @return
     */
    public static List<String> getUserList () {
        return userList;
    }
}
  1. UserMessageDecoder: 自定义接收消息的解码器
package com.lingting.websocketlearn.framework.decoder;

import com.alibaba.fastjson.JSON;
import com.lingting.websocketlearn.dto.UserMessage;

import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;

/**
 * @author Liucheng
 * @date 2019/6/1 16:47
 */
public class UserMessageDecoder implements Decoder.Text<UserMessage> {

    /**
     * 解码函数
     * @param s
     * @return
     * @throws DecodeException
     */
    @Override
    public UserMessage decode(String s) throws DecodeException {
        return JSON.parseObject(s, UserMessage.class);
    }

    @Override
    public boolean willDecode(String s) {
        // 这个方法会在解码之前执行,可以再次判断字符串格式是否合法等,这里就不演示了
        return true;
    }

    @Override
    public void init(EndpointConfig endpointConfig) {

    }

    @Override
    public void destroy() {

    }
}
  1. UserMessageEncoder: 自定义发送消息的编码器
package com.lingting.websocketlearn.framework.encoder;

import com.alibaba.fastjson.JSON;
import com.lingting.websocketlearn.dto.UserMessage;

import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

/**
 * @author Liucheng
 * @date 2019/6/1 16:47
 */
public class UserMessageEncoder implements Encoder.Text<UserMessage> {

    /**
     * 编码函数
     * @param object
     * @return
     * @throws EncodeException
     */
    @Override
    public String encode(UserMessage object) throws EncodeException {
        return JSON.toJSONString(object);
    }

    @Override
    public void init(EndpointConfig endpointConfig) {

    }

    @Override
    public void destroy() {

    }
}
  1. UserMessage : 消息传递 dto (需要自行配置lombok插件)
package com.lingting.websocketlearn.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import javax.validation.constraints.NotNull;

/**
 * @author Liucheng
 * @date 2019/6/1 13:18
 */
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class UserMessage {

    /**
     * 自定义状态码
     * 200 成功: 默认
     * 400 用户名已经被占用
     */
    private Integer code = 200;

    /**
     * 消息来源
     */
    private String fromUser;

    /**
     * 消息目的用户
     * 使用了bean验证技术
     */
    @NotNull
    private String toUser;

    /**
     * 消息内容
     */
    @NotNull
    private String messageContent;

    /**
     * 是否群发
     */
    @NotNull
    private Boolean massTexting;
}

  1. 控制器:用于获取用户列表以及关闭所有连接
package com.lingting.websocketlearn.controller;

import com.lingting.websocketlearn.websocketserver.WebSocketServer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author Liucheng
 * @date 2019/5/2 10:59
 */
@RestController
@RequestMapping("/controller")
public class NotifyController {

    /**
     * 获取所有的用户列表
     * @return
     */
    @GetMapping("/getalluser")
    public List<String> getAllUser () {
        return WebSocketServer.getUserList();
    }

    /**
     * 关闭所有webSocket连接
     * @return
     * @throws IOException
     */
    @GetMapping("/closeall")
    public Map closeAll () throws IOException {

        // 调用静态方法,关闭所有连接
        WebSocketServer.closeAll();

        Map<String, Boolean> result = new HashMap<>();
        result.put("success", true);

        return result;
    }
}

3). 前端

  1. js依赖: 见index.xml描述

  2. index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>webSocket</title>
    <style>
        *{margin: 0; padding: 0;}
    </style>
    <script src="js/vue.min.js"></script>
    <script src="js/axios.min.js"></script>

    <!-- vue/axios的cnd如下 -->
    <!--<script src="https://cdn.jsdelivr.net/npm/vue@2.5.16/dist/vue.js"></script>-->
    <!--<script src="https://cdn.bootcss.com/axios/0.19.0-beta.1/axios.min.js"></script>-->
</head>
<body>
<div id="app">

    <button @click="getUserList()">获取用户列表</button>
    <ul>
        <li v-for="other in otherUsers">{{ other }}</li>
    </ul>
    <form action="">
        <label for="toUser">输入需要发送的用户</label>
        <input type="text" v-model="messageSend.toUser" id="toUser"> <br>
        <label>是否群发</label>
        <input type="radio" value="true" v-model="messageSend.massTexting">是
        <input type="radio" value="false" v-model="messageSend.massTexting" checked>否 <br>
        <label for="messageContent">输入需要发送的信息</label>
        <textarea v-model="messageSend.messageContent" id="messageContent"></textarea>
    </form>
    <br>

    <button @click="sendMessage()">发送</button>
    <br>
    <form action="">
        <label for="messageReceivedContent">接收到的消息</label>
        <textarea v-model="messageReceived.messageContent" id="messageReceivedContent"></textarea>
    </form>
    <br>
    <button @click="close()">关闭连接</button>
</div>
</body>
<script type="text/javascript">

var VM = new Vue({
    el: "#app",
    data: {
        webSocketServer: null, // webSocket对象
        username: null, // 本终端用户名
        messageSend: {    // 发送的消息对象
            code: 200,
            fromUser: null,
            toUser: null,
            messageContent: null,
            massTexting: false
        },
        messageReceived: {    // 接收的消息对象
            code: 200,
            fromUser: null,
            toUser: null,
            messageContent: null,
            massTexting: false
        },
        otherUsers: [],
    },
    created() {
        this.inputUsername ();
    },
    methods: {
        inputUsername () {
            while (this.username == null || this.username.trim() === "") {
                this.username = prompt("请输入用户名!")
            }
            // 建立连接
            this.connection();
        },
        connection () {
            if ("WebSocket" in window) {
                console.log("您的浏览器支持 WebSocket!");

                // 打开一个 web socket【参数可选,这里只是为了测试】
                this.webSocketServer = new WebSocket("ws://localhost:/websocket/" + this.username + "?param=var");

                this.webSocketServer.onopen = this.onOpen;

                this.webSocketServer.onmessage = this.onMessage;

                this.webSocketServer.onclose = this.onClose;

            } else {
                // 浏览器不支持 WebSocket
                console.log("您的浏览器不支持 WebSocket!");
            }
        },
        onOpen(evt) {
            alert("已经连上了服务器")
        },
        onMessage(evt) {
            // 此处获得的为字符串
            this.messageReceived = JSON.parse(evt.data);
            if (this.messageReceived.code !== 200) {
                console.log(this.messageReceived.code)
                this.username = null;
                this.inputUsername();
            }
            console.log(this.messageReceived)
        },
        onClose() {
            alert("客户端关闭了连接")
        },
        sendMessage() {
            // 发送消息
            console.log("messageSend: " + this.messageSend)
            console.log("messageSendStr: " + JSON.stringify(this.messageSend))
            this.webSocketServer.send(JSON.stringify(this.messageSend))
        },
        close() {
            // 关闭连接
            this.webSocketServer.close();
        },
        getUserList () {
            // 获取用户列表
            let _this = this;
            axios.get("/controller/getalluser").then(function (response) {
                // 此处自动转换为一个json对象
                _this.otherUsers = response.data;
                console.log(_this.otherUsers)
            }).catch(function (error) {
                alert(error)
            })
        }
    }
})

</script>
</html>

4). 配置文件

  1. application.yml
server:
  port: 80
  1. logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>

<configuration>
    <!--定义日志文件的存储地址,使用绝对路径-->
    <property name="LOG_HOME" value="d:/logs"/>

    <!-- Console 输出设置 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!-- 按照每天生成日志文件 -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--日志文件输出的文件名-->
            <fileNamePattern>${LOG_HOME}/xc.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- 异步输出 -->
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
        <discardingThreshold>0</discardingThreshold>
        <!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
        <queueSize>512</queueSize>
        <!-- 添加附加的appender,最多只能添加一个 -->
        <appender-ref ref="FILE"/>
    </appender>


    <logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false">
        <appender-ref ref="CONSOLE"/>
    </logger>
    <logger name="org.springframework.boot" level="WARNING"/>
    <root level="info">
        <!--<appender-ref ref="ASYNC"/>-->
        <appender-ref ref="FILE"/>
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

四、测试说明

运行Application, 打开多个本机浏览器页面,在页面地址栏输入 localhost,按照提示操作即可

上一篇 下一篇

猜你喜欢

热点阅读