spring boot websocket 集成

2018-08-03  本文已影响100人  周六不算加班

1、引入websocket jar包

  <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

2、启动类添加websocket相关注解

  @EnableWebSocket//开启webSocket配置

3、配置文件设置
3.1、独立的容器,每次链接都会生成一个新的容器,但是websocket实现类不能注入spring其他的bean。

@ConditionalOnWebApplication
@Configuration
public class WebSocketConfig {
/**
 * 配置WebSocketEndpointServer
 * 如果使用独立的servlet容器,不是使用SpringBoot的内置容器
 * 不需要注入ServerEndpointExporter, 它将由容器自己提供和管理
 * @return
 */
    @Bean
    public ServerEndpointExporter serverEndpointExporter () {
       return new ServerEndpointExporter();
   }
}

3.2、集成到spring的容器,这个配置的问题就是,每次链接会生成同一个容器,新的容器会吧旧的替换掉,spring bean可以注入到实现类中去。可以根据应用场景的不同选择不同的配置。

@ConditionalOnWebApplication
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator implements ApplicationContextAware{
    
    private static volatile BeanFactory context;

    /**
    * 配置WebSocketEndpointServer
    * 如果使用独立的servlet容器,不是使用SpringBoot的内置容器
    * 不需要注入ServerEndpointExporter, 它将由容器自己提供和管理
    * @return
    */
    @Bean
    public ServerEndpointExporter serverEndpointExporter () {
        return new ServerEndpointExporter();
    }

   @Override
    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException
    {
        return context.getBean(clazz);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        System.out.println("auto load"+this.hashCode());
        WebSocketConfig.context = applicationContext;
   }
}

4、endpoint配置(聊天室)

@Component
@ServerEndpoint("/endpoint")
public class SimpleEndpoint {
      /**
       * 日志
       */
       private static final Logger _Logger = LoggerFactory.getLogger(SimpleEndpoint.class);
      /**
       * 线程安全基本数据
       */
       private static AtomicInteger COUNT = new AtomicInteger();
       /**
        * 在线用户实例
        */
        private static CopyOnWriteArraySet<SimpleEndpoint> container = new CopyOnWriteArraySet<SimpleEndpoint>();
        /**
        * 当前用户Session
        */
        private Session session;
       
       //获取连接时调用
      @OnOpen
      public void onOpen (Session session) {
         this.session = session;
        container.add(this);
        addUser();
     }

      //连接关闭时调用
      @OnClose
      public void onClose () {
           //从容器中删除用户
          container.remove(this);
          //减去用户数量
          subUser();
      }

  //接受WebSocket发送的消息
@OnMessage
public void onMessage (String message, Session session) {
    _Logger.info("Send all user: {}", message);
    container.forEach(target -> target.sendMessage(message));
}

//发生错误时调用
@OnError
public void onError (Session session, Throwable error) {
    _Logger.info("error: {}", error.getMessage());
}


/**
 * 发送消息
 * @param message
 */
public void sendMessage (String message) {
    try {
        this.session.getBasicRemote().sendText(message);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

/**
 * 自定义发送消息给全部用户
 * @param message
 */
public static void sendUsers (String message) {
    _Logger.info("Send all user: {}", message);
    container.forEach(target -> target.sendMessage(message));
}


/**
 * 减去用户数量
 */
public void subUser () {
    _Logger.info("Sub user: {}", COUNT.get());
    SimpleEndpoint.COUNT.addAndGet(-1);
}

/**
 * 添加用户数量
 */
public void addUser () {
    _Logger.info("Add user: {}", COUNT.get());
    SimpleEndpoint.COUNT.addAndGet(1);
}

}

5、websocket实现(聊天室)如果websocket容器集成到spring中去,只会生成一个websocket,最好用独立的websocket容器配置

@ServerEndpoint(value ="/websocket/{sid}/{rsid}",configurator=WebSocketConfig.class)
@Component
public class WebSocketServer {

static Logger log= LoggerFactory.getLogger(WebSocketServer.class);
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;

//发送用户id
private String sid="";

//接受用户id
private String rsid="";

/**
 * 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("sid") String sid,@PathParam("rsid") String rsid) {
    this.session = session;
    webSocketSet.add(this);     //加入set中
    addOnlineCount();           //在线数加1
    log.info("有新窗口开始监听:"+sid+",当前在线人数为" + getOnlineCount());
    this.sid=sid;
    this.rsid = rsid;
    try {
        sendMessage("连接成功");
    } catch (IOException e) {
        log.error("websocket IO异常");
    }
}

/**
 * 连接关闭调用的方法
 */
@OnClose
public void onClose() {
    webSocketSet.remove(this);  //从set中删除
    subOnlineCount();           //在线数减1
    log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}

/**
 * 收到客户端消息后调用的方法
 *
 * @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
    session.getId();
    //群发消息
   //        for (WebSocketServer item : webSocketSet) {
   //            try {
   //                item.sendMessage(message);
   //            } catch (IOException e) {
  //                e.printStackTrace();
  //            }
  //        }

    /**
     * 只给聊天的用户跟商家发送消息
     */
    for (WebSocketServer item : webSocketSet) {
        if(item.sid.equals(rsid)||item.sid.equals(sid)){
            try {
                item.sendMessage(sid+":"+message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

/**
 *
 * @param session
 * @param error
 */
@OnError
public void onError(Session session, Throwable error) {
    log.error("发生错误");
    error.printStackTrace();
}
/**
 * 实现服务器主动推送
 */
public void sendMessage(String message) throws IOException {
    this.session.getBasicRemote().sendText(message);
}


/**
 * 群发自定义消息
 * */
public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
    log.info("推送消息到窗口"+sid+",推送内容:"+message);
    for (WebSocketServer item : webSocketSet) {
        try {
            //这里可以设定只推送给这个sid的,为null则全部推送
            if(sid==null) {
                item.sendMessage(message);
            }else if(item.sid.equals(sid)||item.sid.equals("22")){
                item.sendMessage("xx店客服:"+message);
            }
        } catch (IOException e) {
            continue;
        }
    }
}

public static synchronized int getOnlineCount() {
    return onlineCount;
}

public static synchronized void addOnlineCount() {
    WebSocketServer.onlineCount++;
}

public static synchronized void subOnlineCount() {
    WebSocketServer.onlineCount--;
}
}
上一篇下一篇

猜你喜欢

热点阅读