spring boot websocket 集成
2018-08-03 本文已影响100人
周六不算加班
1、引入websocket jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、启动类添加websocket相关注解
@EnableWebSocket//开启webSocket配置
3、配置文件设置
3.1、独立的容器,每次链接都会生成一个新的容器,但是websocket实现类不能注入spring其他的bean。
@ConditionalOnWebApplication
@Configuration
public class WebSocketConfig {
/**
* 配置WebSocketEndpointServer
* 如果使用独立的servlet容器,不是使用SpringBoot的内置容器
* 不需要注入ServerEndpointExporter, 它将由容器自己提供和管理
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
}
3.2、集成到spring的容器,这个配置的问题就是,每次链接会生成同一个容器,新的容器会吧旧的替换掉,spring bean可以注入到实现类中去。可以根据应用场景的不同选择不同的配置。
@ConditionalOnWebApplication
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator implements ApplicationContextAware{
private static volatile BeanFactory context;
/**
* 配置WebSocketEndpointServer
* 如果使用独立的servlet容器,不是使用SpringBoot的内置容器
* 不需要注入ServerEndpointExporter, 它将由容器自己提供和管理
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
@Override
public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException
{
return context.getBean(clazz);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
System.out.println("auto load"+this.hashCode());
WebSocketConfig.context = applicationContext;
}
}
4、endpoint配置(聊天室)
@Component
@ServerEndpoint("/endpoint")
public class SimpleEndpoint {
/**
* 日志
*/
private static final Logger _Logger = LoggerFactory.getLogger(SimpleEndpoint.class);
/**
* 线程安全基本数据
*/
private static AtomicInteger COUNT = new AtomicInteger();
/**
* 在线用户实例
*/
private static CopyOnWriteArraySet<SimpleEndpoint> container = new CopyOnWriteArraySet<SimpleEndpoint>();
/**
* 当前用户Session
*/
private Session session;
//获取连接时调用
@OnOpen
public void onOpen (Session session) {
this.session = session;
container.add(this);
addUser();
}
//连接关闭时调用
@OnClose
public void onClose () {
//从容器中删除用户
container.remove(this);
//减去用户数量
subUser();
}
//接受WebSocket发送的消息
@OnMessage
public void onMessage (String message, Session session) {
_Logger.info("Send all user: {}", message);
container.forEach(target -> target.sendMessage(message));
}
//发生错误时调用
@OnError
public void onError (Session session, Throwable error) {
_Logger.info("error: {}", error.getMessage());
}
/**
* 发送消息
* @param message
*/
public void sendMessage (String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 自定义发送消息给全部用户
* @param message
*/
public static void sendUsers (String message) {
_Logger.info("Send all user: {}", message);
container.forEach(target -> target.sendMessage(message));
}
/**
* 减去用户数量
*/
public void subUser () {
_Logger.info("Sub user: {}", COUNT.get());
SimpleEndpoint.COUNT.addAndGet(-1);
}
/**
* 添加用户数量
*/
public void addUser () {
_Logger.info("Add user: {}", COUNT.get());
SimpleEndpoint.COUNT.addAndGet(1);
}
}
5、websocket实现(聊天室)如果websocket容器集成到spring中去,只会生成一个websocket,最好用独立的websocket容器配置
@ServerEndpoint(value ="/websocket/{sid}/{rsid}",configurator=WebSocketConfig.class)
@Component
public class WebSocketServer {
static Logger log= LoggerFactory.getLogger(WebSocketServer.class);
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//发送用户id
private String sid="";
//接受用户id
private String rsid="";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("sid") String sid,@PathParam("rsid") String rsid) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
log.info("有新窗口开始监听:"+sid+",当前在线人数为" + getOnlineCount());
this.sid=sid;
this.rsid = rsid;
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
session.getId();
//群发消息
// for (WebSocketServer item : webSocketSet) {
// try {
// item.sendMessage(message);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
/**
* 只给聊天的用户跟商家发送消息
*/
for (WebSocketServer item : webSocketSet) {
if(item.sid.equals(rsid)||item.sid.equals(sid)){
try {
item.sendMessage(sid+":"+message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
* */
public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
log.info("推送消息到窗口"+sid+",推送内容:"+message);
for (WebSocketServer item : webSocketSet) {
try {
//这里可以设定只推送给这个sid的,为null则全部推送
if(sid==null) {
item.sendMessage(message);
}else if(item.sid.equals(sid)||item.sid.equals("22")){
item.sendMessage("xx店客服:"+message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}