Websocket通过Redis实现Session共享

2019-12-23  本文已影响0人  普普通通的小斌
架构图
file
测试代码搭建

pom依赖

     <!-- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- webSocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

开启Websocket配置

@Configuration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

WebsocketPool类

package com.chainter.rmblc.messaging.net;

import lombok.extern.java.Log;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import javax.websocket.Session;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ningbin 2019/12/18 13:45
 * @Description:
 */
@Log
public class WebSocketPool {
    // 当前在线人数
    private static final AtomicInteger atomicNumber = new AtomicInteger(0);
    // 当前Websocket session连接
    private static Map<String, Session> onlineSession = new ConcurrentHashMap<>();


    public static Integer addAtomicNumber(){
        return atomicNumber.incrementAndGet();
    }
    public static Integer decrementNumber(){
        return atomicNumber.decrementAndGet();
    }
    public static Integer getNumber(){
        return atomicNumber.get();
    }

    public static void createOnlineSession(String userId,Session session){
        onlineSession.put(userId,session);
    }
    public static Map<String,Session> getOnlineSession(){
        return onlineSession;
    }
    public static Session getSesssionByUserId(String userId){
        return Optional.ofNullable(onlineSession.get(userId)).orElse(null);
    }
    public static void removeSession(String userId){
        Session session = onlineSession.get(userId);
        if(ObjectUtils.isEmpty(session)){
            return;
        }
        try {
            session.close();
            onlineSession.remove(userId);
        } catch (IOException e) {
            log.warning("关闭连接出现错误");
        }
    }

    public static void send(){
        onlineSession.values().stream().forEach(session -> {
            try {
                Date date = new Date();
                long time = date.getTime();
                String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
                session.getBasicRemote().sendText("时间:"+dateString+",毫秒:"+time);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

}

ws连接类

@Component
@ServerEndpoint(value = "/WebSocketTest/{userId}")
@Log
public class WebsocketTest {


    @OnOpen
    public void onOpen(@PathParam("userId") String userId, Session session) {

        Integer number = WebSocketPool.addAtomicNumber();
        WebSocketPool.createOnlineSession(session.getId(),session);
//        MessageTaskHandle.createTaskHandle(session,"服务器主动推送信息");
        log.info("建立连接,当前人数:"+number);
    }

    @OnClose
    public void onClose(@PathParam("userId") String userId, Session session){
        WebSocketPool.removeSession(session.getId());
        Integer number = WebSocketPool.decrementNumber();
        log.info("用户"+userId+"关闭连接,当前人数:"+number);
    }

    @OnError
    public void onError(@PathParam("userId") String userId, Session session,Throwable throwable){
        WebSocketPool.removeSession(session.getId());
        log.warning("WebSocket连接出现异常");
    }



}

添加redis监听类

package com.chainter.rmblc.messaging.config;

import com.chainter.rmblc.messaging.handle.RedisListenerHandle;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
 * @author ningbin 2019/12/19 18:20
 * @Description:
 */
@Component
@Log
public class RedisListenerBean {
         
         // application.yml中配置allWSName
    @Value("${sub.channel.allWSName}")
    private String allWSName;

    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 监听msgToAll
        container.addMessageListener(listenerAdapter, new PatternTopic(allWSName));
        log.info("Subscribed Redis channel: " + allWSName);
        return container;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(RedisListenerHandle redisListenerHandle){
        return new MessageListenerAdapter(redisListenerHandle,"receiveMessage");
    }
}

创建RedisListenerHandle监听消息处理类

package com.chainter.rmblc.messaging.handle;

import org.springframework.stereotype.Component;

/**
 * @author ningbin 2019/12/20 10:07
 * @Description:
 */
@Component
public class RedisListenerHandle {

    public void receiveMessage(String message){
        System.out.println("接收消息:"+message);
    }

}

个人联系方式QQ:944484545,欢迎大家的加入,分享学习是一件开心事
上一篇 下一篇

猜你喜欢

热点阅读