Spring-Cloud RabbitMQ 用法 - 监听jso

2020-03-13  本文已影响0人  heichong

本篇主要介绍基于spring的@RabbitListener监听json对象的用法


1. 环境

spring-boot 2.1.1.RELEASE
spring-cloud Finchley.RELEASE
rabbitmq 3.7.10
spring-boot-starter-amqp

2. Json在RabbitMQ中的格式

在上一节中,我们使用两种方式来发送json对象,它们在RabbitMQ中的格式为:

  • 可以看出格式2格式1多了一个header属性:__TypeId__,其值为json对象的全限定类名。
  • 我们通过方式1发送时,由于消息是我们自定义的,所以RabbitTemplate就没有给我们增加此属性(当然我们也可以通过Message.setHeader()方式来手动设置);而通过方式2发送时,Jackson2JsonMessageConverter为我们设置了消息的__TypeId__属性
  • 这两种格式的消息监听还是有差别的,下面我们进行详细说明

2. 消息监听容器 MessageListenerContainer

要使用监听器,必须要先了解MessageListenerContainer,它通过MessageConverter把RabbitMQ中的byte[]转化为其他对象。

    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        //设置json序列化 消息转换器
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
    /**
     * 消费者端配置: 重写默认的监听器工厂
     * 创建一个支持自定义json序列化监听容器工厂
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        /**
         * 系统使用的默认消息转换器为SimpleMessageConverter,可以处理文本、java序列化等
         * 我们要在默认的基础上,再添加一个处理json的消息转化器。
         * 所以这里我们使用了ContentTypeDelegatingMessageConverter,它可以代理多个消息处理器,每个消息处理器由contentType决定。
         * new ContentTypeDelegatingMessageConverter()就是指定了SimpleMessageConverter作为默认的消息处理器
         */
        ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter() ;
        messageConverter.addDelegate(MessageProperties.CONTENT_TYPE_JSON,new Jackson2JsonMessageConverter());

        //设置消息转换器
        factory.setMessageConverter(messageConverter);
        return factory;
    }

按以上配置以后,就可以使用监听器来监听json类型的数据

3. 监听器

spring中使用@RabbitListener来实现消费者监听器,它用法有两种:

@RabbitListener 标注方法

使用方式:

@Component
public class QueueJsonListenerByRabbitListener {
    Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
     * @param headers
     */
    @RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
    public void process(@Payload Department department, @Headers Map<String,Object> headers) {
        logger.info("<--- json 我收到的消息:"+department+", header="+headers);
    }
}
  • queues为要监听的队列名称
  • @Payload为消息对象本身
  • @Headers为消息对象的头部,也可以通过@Header获取单个头部属性:@Header String token

@RabbitListener 标注类,@RabbitHandler 标注方法

使用方式:

@Component
@RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
public class QueueJsonListenerByRabbitHandler {
    Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
     * @param headers
     */
    @RabbitHandler
    public void process(@Payload Department department, @Headers Map<String,Object> headers) {
        logger.info("<--- json 我收到的消息:"+department+", header="+headers);
    }
}

参考

上一篇下一篇

猜你喜欢

热点阅读