SpringBoot集成netty-socketio

2021-03-30  本文已影响0人  Anson_1f2a

目标:前后端使用SocketIO进行通信
框架:SpringBoot 2.4.4

1. 添加依赖

<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.18</version>
</dependency>

2. 添加YML配置

socketio:
  port: 8000
  boss-threads: 1
  worker-threads: 100
  is-allow-custom-requests: true
  upgrade-timeout: 1000000
  ping-timeout: 5000000
  ping-interval: 30000

3. 实现Spring配置类

SocketProperties .kt

import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties(prefix = "socketio")
class SocketProperties {
    var port: Int = 8000
    var bossThreads: Int = 1
    var workerThreads: Int = 100
    var isAllowCustomRequests: Boolean = true
    var upgradeTimeout: Int = 1000000
    var pingTimeout: Int = 6000000
    var pingInterval: Int = 30000
}

SocketConfig.kt

import com.corundumstudio.socketio.SocketConfig
import com.corundumstudio.socketio.SocketIOServer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner
import org.springframework.boot.context.properties.EnableConfigurationProperties

@Configuration
@EnableConfigurationProperties(SocketProperties::class)
class SocketConfig(
        private val properties: SocketProperties
) {

    @Bean
    fun socketIOServer(): SocketIOServer {
        val socketConfig = SocketConfig()
        socketConfig.isTcpNoDelay = true
        socketConfig.soLinger = 0
        val config = com.corundumstudio.socketio.Configuration()
        config.socketConfig = socketConfig
        config.port = properties.port
        config.bossThreads = properties.bossThreads
        config.workerThreads = properties.workerThreads
        config.isAllowCustomRequests = properties.isAllowCustomRequests
        config.upgradeTimeout = properties.upgradeTimeout
        config.pingTimeout = properties.pingTimeout
        config.pingInterval = properties.pingInterval
        return SocketIOServer(config)
    }

    @Bean
    fun springAnnotationScanner(): SpringAnnotationScanner {
        return SpringAnnotationScanner(socketIOServer())
    }
}

4. 实现接收的消息体

前端发送给后端的消息格式

import java.io.Serializable

class Command : Serializable {
    val cmd: CmdEnum = ""
    val content: List<String> = listOf()
}

5. 实现服务端

import com.corundumstudio.socketio.AckRequest
import com.corundumstudio.socketio.SocketIOClient
import com.corundumstudio.socketio.SocketIOServer
import com.corundumstudio.socketio.annotation.OnConnect
import com.corundumstudio.socketio.annotation.OnDisconnect
import com.corundumstudio.socketio.annotation.OnEvent
import org.apache.commons.lang3.ObjectUtils
import org.apache.logging.log4j.LogManager
import org.springframework.data.annotation.PersistenceConstructor
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap

@Component
class SocketController(
        private val socketIOServer: SocketIOServer
) {
    private val log = LogManager.getLogger()

    companion object {
        val clientMap = ConcurrentHashMap<String, SocketIOClient>()
    }

    /**
     * 客户端连接的时候触发
     *
     * @param client
     */
    @OnConnect
    fun onConnect(client: SocketIOClient) {
        val token = client.handshakeData.getSingleUrlParam("oauthToken")
        if (ObjectUtils.isNotEmpty(token)) {
            //存储SocketIOClient,用于发送消息
            clientMap[token] = client
            //回发消息
            client.sendEvent("message", "已连接")
            log.info("客户端:" + client.sessionId + "已连接,token=" + token)
        }
    }

    /**
     * 客户端关闭连接时触发
     *
     * @param client
     */
    @OnDisconnect
    fun onDisconnect(client: SocketIOClient) {
        val token = client.handshakeData.getSingleUrlParam("oauthToken")
        if (ObjectUtils.isNotEmpty(token)) {
            clientMap.remove(token)
            log.info("客户端:" + client.sessionId + "断开连接")
        }
    }

    /**
     * 客户端事件
     *
     * @param client   客户端信息
     * @param request 请求信息
     * @param data     客户端发送数据
     */
    @OnEvent(value = "command")
    fun onEvent(client: SocketIOClient, request: AckRequest?, data: Command) {
        if (data.cmd != null) {
            log.debug("命令:{},内容:{}", data.cmd, data.content)
            //回发消息
            client.sendEvent("event", "我是服务器都安发送的信息")
            //广播消息
            sendBroadcast()
        }
    }

    /**
     * 广播消息
     */
    fun sendBroadcast() {
        for (client in clientMap.values) {
            if (client.isChannelOpen) {
                client.sendEvent("Broadcast", "当前时间" + System.currentTimeMillis())
            }
        }
    }
}

6. Spring应用启动类

import com.corundumstudio.socketio.SocketIOServer
import org.springframework.boot.CommandLineRunner
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class CmsApplication(
        private val socketIOServer: SocketIOServer
) : CommandLineRunner {
    override fun run(vararg args: String?) {
        socketIOServer.start()
        println("socket.io 启动成功")
    }
}

fun main(args: Array<String>) {
    System.setProperty("es.set.netty.runtime.available.processors", "false")
    runApplication<CmsApplication>(*args)
}

7. 前端页面

比较简单的demo,所以前端直接加载socketio client的js

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta http-equiv="X-UA-Compatible" content="IE=edge" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
    <script src="./socket.js"></script>
  </head>
  <body>
    <script>
      const socket = io('ws://localhost:8000', {
        transports: ['websocket'],
        query: {
          token: '2222',
        }
      })
      socket.on('event', (data) => {
        console.log(data)
      })
      function test() {
        socket.emit('command', 'test')
      }
    </script>
    <button onclick="test()">test</button>
  </body>
</html>

注意事项

  1. 版本兼容
    一开始前端使用了最新的socket.io-client,socket是已经建立起链接,但死活就是不能正常通信(后端能往前端发送消息,前端不能emit或处理事件)。经过多次尝试及查找资料,发现是版本兼容性的问题。
    最新的netty-socketio只支持以下版本:
    image.png
    socket.io-client的版本已经去到4.0.0,看了下历史的版本,1已经是4年前的了。版本2里面最新的是2.4.0,但里面并没有写,最后测了几个版本,发现2.3.1能用,而且最后发布是6个月左右。
    image.png
  2. Json无法解析
    一开始消息体使用的是data class作为消息体.
data class Command(
    val cmd: String,
    val content: List<String>
) : Serializable

由于没有构造函数,会报以下异常错误。直接使用class就解决了。

java.io.IOException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `com.xxxx.socket.Command` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 13]
上一篇下一篇

猜你喜欢

热点阅读