Java 杂谈程序员微服务 -实战系列

分布式 应用中实现 即时消息推送

2018-09-14  本文已影响250人  黄大海

流程图

实时推送.png

RabbitMQ消息队列配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Configuration
public class RabbitConfig {
    @Autowired
    private AxeRabbitProperties axeRabbitProperties;
    
    @Bean
    public Queue queue() {
        return new Queue(axeRabbitProperties.getSharedQueueName(), true/*durable*/, false/*exclusive*/, false/*autoDelete*/);
    }
    
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(axeRabbitProperties.getTopicName());
    }
    
    @Bean
    public List<Binding> bindingQueues(Queue queue, TopicExchange exchange){
        List<Binding> result = Lists.newArrayList();
        result.add(BindingBuilder.bind(queue).to(exchange).with(TrsMsgRounting.AUDIT.getKey() + ".#"));
        result.add(BindingBuilder.bind(queue).to(exchange).with(TrsMsgRounting.ACK.getKey() + ".#"));
        result.add(BindingBuilder.bind(queue).to(exchange).with(TrsMsgRounting.ASSET_AUDIT.getKey() + ".#"));
        result.add(BindingBuilder.bind(queue).to(exchange).with(ActMsgRounting.AUDIT.getKey() + ".#"));
        return result;
    }
}
@Service
public class MyService {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    public void doSomething(){
        rabbitTemplate.convertAndSend(rabbitConfig.exchange().getName(), TrsMsgRounting.AUDIT.getKey(), JSON.toJSONString(message));
    }
}
@Component
@RabbitListener(queues = "#{axeRabbitProperties.sharedQueueName}")
public class SharedQueueReceiver {
    @Autowired 
    private ApplicationContext applicationContext;

    @RabbitHandler
    public void process(@Payload String body, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
        //TODO persist locally
        applicationContext.publishEvent(new UserNotificationEvent(this, applicationContext.getId(), notification))
    }
}

通过SpringCloudBus广播事件

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
public class UserNotificationEvent extends RemoteApplicationEvent {
    private Object payload;

    protected UserNotificationEvent(){
        // for serializers
    }
    ...
}
@RemoteApplicationEventScan("package.name.of.UserNotificationEvent")
applicationContext.publishEvent(new UserNotificationEvent(this, applicationContext.getId(), notification))
@Component
public class UserNotificationListener implements ApplicationListener<UserNotificationEvent> {
  
  // The SimpMessagingTemplate is used to send Stomp over WebSocket messages.
  @Autowired
  private SimpMessagingTemplate messagingTemplate;
  
  @Override
  public void onApplicationEvent(UserNotificationEvent event) {
        messagingTemplate.convertAndSendToUser(
            event.getPayload().getUserid(), 
            "/queue/notify", 
            event.getPayload()
        );
  }

websocket配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // Register the "/ws" endpoint, enabling the SockJS protocol.
        // SockJS is used (both client and server side) to allow alternative
        // messaging options if WebSocket is not available.
        registry.addEndpoint("/ws")①
            .addInterceptors(new HttpSessionHandshakeInterceptor())②
            .setHandshakeHandler(new DefaultHandshakeHandler() {//userId -> principal.name
                @Override
                protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
                    return new Principal() {
                        @Override
                        public String getName() {
                            return attributes.get(“session_user_id”);③
                        }
                    };
                }
            })
            .setAllowedOrigins("*")④
            .withSockJS();⑤
        return;
}
<html>
  <head>
    <script src="javascript/sockjs-0.3.4.min.js"></script>
    <script src="javascript/stomp.min.js"></script>
  </head>
  
  <script>

      /**
       * Open the web socket connection and subscribe the "/notify" channel.
       */
      function connect() {

        // Create and init the SockJS object
        var socket = new SockJS('/ws');①
        var stompClient = Stomp.over(socket);②

        // Subscribe the '/notify' channell
        stompClient.connect({}, function(frame) {
          stompClient.subscribe('/user/queue/notify', function(notification) {③

            //doSomething when receive a notification
            doSomething(JSON.parse(notification.body).payload);

          });
        });
        
        return;
      } // function connect
  </script>
</html>

Spring Cloud Bus 与 Spring WebSocket 的 BUG

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    
    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters){
        BusJacksonMessageConverter busConverter = new BusJacksonMessageConverter();
        busConverter.setPackagesToScan(new String[]{ClassUtils.getPackageName(UserNotificationEvent.class)});
        try {
            busConverter.afterPropertiesSet();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        messageConverters.add(busConverter);
        return true;
    }
}

关于RabbitMQ和SpringCloudBus的细节,可以参考spring-boot-starter-amqp 那些文档里没告诉你的事情, Spring-cloud-bus 原理

上一篇 下一篇

猜你喜欢

热点阅读