分布式 应用中实现 即时消息推送
2018-09-14 本文已影响250人
黄大海
流程图

- 后台业务系统发送消息到公共RabbitMq的Topic
- Web应用(集群中的一台)通过routingKey获取自己需要的消息
- 收到消息的Web应用实例将消息持久化到数据库(未在线的用户可以登录时查询到)
- 收到消息的WEB应用实例广播消息给其他实例
- 每个WEB实例通过websocket推送给连接到本机的前端
- 通过STOMP子协议,只有特定的USER才能收到信息
RabbitMQ消息队列配置
- 消息的接收和订阅在Rabbit中有2个方向
- 一个是发送给多个Topic,然后让Queue连接多个感兴趣的Topic
- 另一个是发送到统一个Topic, 然后Queue通过匹配不同的routingKey来订阅消息
- 或者是以上两种的混合应用。简单起见,我们选择第二种。
- 导入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置exchange/queue/binding
@Configuration
public class RabbitConfig {
@Autowired
private AxeRabbitProperties axeRabbitProperties;
@Bean
public Queue queue() {
return new Queue(axeRabbitProperties.getSharedQueueName(), true/*durable*/, false/*exclusive*/, false/*autoDelete*/);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(axeRabbitProperties.getTopicName());
}
@Bean
public List<Binding> bindingQueues(Queue queue, TopicExchange exchange){
List<Binding> result = Lists.newArrayList();
result.add(BindingBuilder.bind(queue).to(exchange).with(TrsMsgRounting.AUDIT.getKey() + ".#"));
result.add(BindingBuilder.bind(queue).to(exchange).with(TrsMsgRounting.ACK.getKey() + ".#"));
result.add(BindingBuilder.bind(queue).to(exchange).with(TrsMsgRounting.ASSET_AUDIT.getKey() + ".#"));
result.add(BindingBuilder.bind(queue).to(exchange).with(ActMsgRounting.AUDIT.getKey() + ".#"));
return result;
}
}
- 发送端
@Service
public class MyService {
@Autowired
private AmqpTemplate rabbitTemplate;
public void doSomething(){
rabbitTemplate.convertAndSend(rabbitConfig.exchange().getName(), TrsMsgRounting.AUDIT.getKey(), JSON.toJSONString(message));
}
}
- 接收端
@Component
@RabbitListener(queues = "#{axeRabbitProperties.sharedQueueName}")
public class SharedQueueReceiver {
@Autowired
private ApplicationContext applicationContext;
@RabbitHandler
public void process(@Payload String body, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
//TODO persist locally
applicationContext.publishEvent(new UserNotificationEvent(this, applicationContext.getId(), notification))
}
}
通过SpringCloudBus广播事件
- 也可以直接用Rabbitmq广播,配置上稍多些。
- 加入maven依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
- 创建自己的Event
public class UserNotificationEvent extends RemoteApplicationEvent {
private Object payload;
protected UserNotificationEvent(){
// for serializers
}
...
}
- CloudBus需要类信息来区分是RemoteApplicationEvent的子类, 在配置类上加上扫描
@RemoteApplicationEventScan("package.name.of.UserNotificationEvent")
- 单节点在收到后台业务系统发来的消息,并持久化后,广播给所有节点
applicationContext.publishEvent(new UserNotificationEvent(this, applicationContext.getId(), notification))
- 各个节点收到广播消息后,根据业务上的userid,发给相关用户
@Component
public class UserNotificationListener implements ApplicationListener<UserNotificationEvent> {
// The SimpMessagingTemplate is used to send Stomp over WebSocket messages.
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Override
public void onApplicationEvent(UserNotificationEvent event) {
messagingTemplate.convertAndSendToUser(
event.getPayload().getUserid(),
"/queue/notify",
event.getPayload()
);
}
websocket配置
- 加入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- 配置websocket
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// Register the "/ws" endpoint, enabling the SockJS protocol.
// SockJS is used (both client and server side) to allow alternative
// messaging options if WebSocket is not available.
registry.addEndpoint("/ws")①
.addInterceptors(new HttpSessionHandshakeInterceptor())②
.setHandshakeHandler(new DefaultHandshakeHandler() {//userId -> principal.name
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
return new Principal() {
@Override
public String getName() {
return attributes.get(“session_user_id”);③
}
};
}
})
.setAllowedOrigins("*")④
.withSockJS();⑤
return;
}
-
该配置采用stomp子协议来实现订阅发布功能
- ① 定义websocket的入口端点
- ② Session复制拦截器,websocket握手时发送的普通http请求,这是可以获得当时的session, 该拦截器把session字段复制到websocket的属性字段上
- ③ 把复制过来的session字段中获得userid,把它作为stomp的用户标示
- ④ 跨域
- ⑤ SockJS支持。在不支持websocket浏览器的环境下降级为long-polling或其他方式
-
浏览器端
<html>
<head>
<script src="javascript/sockjs-0.3.4.min.js"></script>
<script src="javascript/stomp.min.js"></script>
</head>
<script>
/**
* Open the web socket connection and subscribe the "/notify" channel.
*/
function connect() {
// Create and init the SockJS object
var socket = new SockJS('/ws');①
var stompClient = Stomp.over(socket);②
// Subscribe the '/notify' channell
stompClient.connect({}, function(frame) {
stompClient.subscribe('/user/queue/notify', function(notification) {③
//doSomething when receive a notification
doSomething(JSON.parse(notification.body).payload);
});
});
return;
} // function connect
</script>
</html>
- ①连接websocket端点
- ②采用Stomp协议
- ③订阅destination /user/queue/notify
- 关于stomp用户订阅的细节可以参考org.springframework.messaging.simp.user.DefaultUserDestinationResolver上的注释
Spring Cloud Bus 与 Spring WebSocket 的 BUG
- Spring WebSocket的MessageHandler注册在前,导致Spring Cloud Bus的事件解析出错。
- 目前是这样处理的
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters){
BusJacksonMessageConverter busConverter = new BusJacksonMessageConverter();
busConverter.setPackagesToScan(new String[]{ClassUtils.getPackageName(UserNotificationEvent.class)});
try {
busConverter.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
messageConverters.add(busConverter);
return true;
}
}