Springboot+WebSocket+Reids SUBSC

2023-06-16  本文已影响0人  昵称已使用换一个吧

一、使用场景
前端实时展示Tail信息
二、WebSocket注解介绍
当使用 Spring WebSocket 来实现 WebSocket 端点时,可以使用 @ServerEndpoint 注解来标记 WebSocket 端点类,并在该类中实现相应的 WebSocket 处理方法。
1.@ServerEndpoint 注解:@ServerEndpoint 注解用于标记一个类作为 WebSocket 端点。它将指定一个 URI,客户端可以使用该 URI 连接到 WebSocket 端点。示例代码如下:

java
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {
    // ...
}

上述代码中,@ServerEndpoint("/websocket") 表示将 MyWebSocketEndpoint 类标记为一个 WebSocket 端点,并使用 /websocket 作为客户端连接的 URI。

2.@OnOpen 注解:@OnOpen 注解用于标记一个方法,在客户端与 WebSocket 端点成功建立连接后执行。该方法可以包含一个 Session 参数,用于表示客户端的 WebSocket 会话。示例代码如下:

java
Copy code
@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    @OnOpen
    public void onOpen(Session session) {
        // 处理连接建立后的逻辑
    }

}

在上述示例中,当客户端与 WebSocket 端点建立连接后,onOpen() 方法将被调用,可以在该方法中执行连接建立后的逻辑处理。

3.@OnMessage 注解:@OnMessage 注解用于标记一个方法,在接收到客户端发送的消息时执行。该方法可以包含一个 Session 参数表示客户端的 WebSocket 会话,以及一个 String 或 byte[] 参数表示接收到的消息内容。示例代码如下:

java
Copy code
@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    @OnMessage
    public void onMessage(Session session, String message) {
        // 处理接收到的消息
    }

}

在上述示例中,当客户端发送消息到 WebSocket 端点时,onMessage() 方法将被调用,可以在该方法中处理接收到的消息。

4.@OnClose 注解:@OnClose 注解用于标记一个方法,在客户端与 WebSocket 端点断开连接时执行。该方法可以包含一个 Session 参数,用于表示客户端的 WebSocket 会话。示例代码如下:

java
Copy code
@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    @OnClose
    public void onClose(Session session) {
        // 处理连接关闭后的逻辑
    }

}

在上述示例中,当客户端与 WebSocket 端点断开连接时,onClose() 方法将被调用,可以在该方法中执行连接关闭后的逻辑处理。

通过使用 @ServerEndpoint 注解和相应的 WebSocket 处理方法,您可以创建一个基于 Spring WebSocket 的 WebSocket 端点,并处理与客户端的连接、消息发送和连接断开等操作。

三、需要注意的地方
1.有个地方需要注意,使用@ServerEndpoint注解后,这个Controller还需要用@Component注解吗?
当使用@ServerEndpoint("/websocket")注解标记一个类作为WebSocket端点时,该类已经被隐式地视为一个Spring组件,不需要额外添加@Component注解。

@ServerEndpoint注解本身具有@Component注解的语义,因此被标记为WebSocket端点的类将被Spring自动扫描和管理,无需显式添加@Component注解。

因此,您只需使用@ServerEndpoint("/websocket")注解来标记WebSocket端点类,无需添加其他Spring组件相关的注解。Spring将会自动识别和管理WebSocket端点。

可是我在项目中去掉在Controller中@Component注解,用postman测试就404了,wscat也不好使,这是一个让我疑惑的地方,大家有好的实践结果也请评论一下,大家一块探讨一下。

四、实践和应用:通过学到WebSocket内容结合实际的项目开发遇到的问题以及解决方案。
问题一:在SpringBoot中使用WebSocket,类中无法通过@Resource和@Autowired注解,将Spring中的Bean注入,因为WebSocket容器和 Spring 容器是两个独立的容器,各自负责不同的任务。WebSocket 容器负责管理 WebSocket端点、处理WebSocket连接等 WebSocket 相关的功能。Spring 容器负责管理Spring 管理的 Bean、依赖注入、AOP等 Spring 框架相关的功能
为了实现 WebSocket端点与Spring 容器的整合,Spring 提供了·ApplicationContextAware接口,并在 WebSocket 容器初始化时与 Spring 容器进行协作。具体流程如下:
1.WebSocket 容器检测到需要实例化的“@ServerEndpoint’类,并进行实例化
2.如果该“@ServerEndpoint'类实现了·ApplicationContextAware'接口,WebSocket容器会通过反射机制调用其setApplicationContext() 方法
3.WebSocket容器会将Spring 的应用程序上下文对象传递给·setApplicationContext()方法,以供@ServerEndpoint’类使用。这种协议的实现使得在“@ServerEndpoint'类中可以访问Spring 的应用程序上下文,以便获取Spring 管理的 Bean、进行依赖注入等操作,需要注意的是当项目启动后会进入到setApplicationContext()方法中,之后每次访问WebSocket的接口,均不在进入setApplicationContext()方法,所以ApplicationContext需要用static修饰。
问题二:在联调项目的时候,前端无法通过Hear传递Token,结合项目排查和测试,确实不能在Hear中传递Token。只能将Token当作参数传递到接口中,通过Token,反向获取Shiro中的用户信息,完成校验数据。
问题三:在本地联调完毕后,部署测试环境,发现用WS方式请求接口,前端抛出了This request has beenblocked; this endpoint must be available over WSS.的异常。WS是不安全的方式,希望通过WSS的方式请求接口。要将WebSocket(@ServerEndpoint)改为使用WSS(WebSocket over SSL / TLS),需要执行以下步骤:
1.获取SSL证书:您需要获得一个有效的SSL证书。通常,您可以从证书颁发机构(Certificate Authority,CA)获取SSL证书。
2.配置服务器:在服务器上,您需要配置SSL连接以使用您的SSL证书。这涉及到配置服务器软件(如Nginx、Apache等)来处理SSL连接。

五、代码逻辑,项目中有Shiro框架
1.pom

      <!-- Spring Boot Websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.Application

  @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

3.Controller

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;

import org.apache.shiro.session.mgt.SimpleSession;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.SerializationUtils;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;

import javax.websocket.OnClose;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Log4j2
@Component
@ServerEndpoint(value = "/data/ail/{id}/{did}/{token}")
public class TailDetailController implements ApplicationContextAware {
    private static final int ONLINE_COUNT_MAX = 5;
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger();

    private JedisCluster jedis;

    private JdbcTemplate jdbcDsp;

    private JedisPubSub sub;


    //项目启动的时候能进这个setApplicationContext中不是null,当访问这个url请求接口的时候就是null,只能把他定义成static
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        DebugDeviceTailDetailController.applicationContext = applicationContext;

    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("did") String did, @PathParam("id") Long id, @PathParam("token") String token) throws IOException {
        if (jdbcDsp == null) {
            jdbcDsp = applicationContext.getBean("jdbcDsp", JdbcTemplate.class);
        }
        if (jedis == null) {
            jedis = applicationContext.getBean("jedis", JedisCluster.class);
        }
       //shiro 存在reidis的中key固定前缀,源码有,下边是根据token从reids取出用户信息
        token = "shiro:session:" + token;
        byte[] sessionBytes = jedis.get(token.getBytes());
        SimpleSession sessions = null;
        if (sessionBytes != null) {
            sessions = (SimpleSession) SerializationUtils.deserialize(sessionBytes);
        }
        if (sessions == null) {
            session.close();
            return;
        }
        //org.apache.shiro.subject.support.DefaultSubjectContext_PRINCIPALS_SESSION_KEY
        Object attribute = sessions.getAttribute("org.apache.shiro.subject.support.DefaultSubjectContext_PRINCIPALS_SESSION_KEY");
        if (attribute == null) {
            session.close();
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(attribute.toString());
        if (jsonObject == null) {
            session.close();
            return;
        }
        User user = JSONObject.parseObject(JSON.toJSONString(jsonObject), User.class);
        log.info(JSON.toJSONString(user));

        if (dsUmUser == null) {
            session.close();
            throw new IllegalStateException("please login first");
        }

      //这里是给前端打一个最大限制信息
     /*   if (ONLINE_COUNT.incrementAndGet() > ONLINE_COUNT_MAX) {
            try {
                session.getBasicRemote().sendText("max connection limited");
            } catch (IOException e) {
                log.debug("DeviceDebug tail error {}", e.toString());
            }
        }*/

        sub = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                log.info("Received message: {}, on channel: {}", message, channel);
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("send error", e);
                }
            }
        };
        //当colse后这个线程会停止
        new Thread(() -> {
            String key = String.format("aa:%d:bb:%s", id, did);
            //订阅
            jedis.subscribe(sub, key);
        }, "SubscribeDetailThread").start();

    }

    @OnClose
    public void onClose() {
        if (sub != null) {
            sub.unsubscribe();
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读