【RabbitMQ-11】MQ消费端监听消息的两种方式

2020-11-24  本文已影响0人  小胖学编程

注解式:

【RabbitMQ-10】@RabbitListener注解生效的源码分析中,bean在初始化的时候,解析@RabbitListener注解,根据注解配置和SimpleRabbitListenerContainerFactory创建SimpleMessageListenerContainer对象。

注解式,注解配置的信息优先级高于配置文件的配置,当然也可以在代码中创建SimpleRabbitListenerContainerFactory类并放入Spring容器中。

@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true) 
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
}

若使用自己在代码中配置的监听工厂,那么@RabbitListener中要进行声明:

@Component
@Slf4j
public class CustomerRev {
   //声明使用的的监听器工厂(不声明使用默认的工厂)
    @RabbitListener(containerFactory = "singleListenerContainer", queues = {"kinson2"})
    public void receiver5(Message msg, Channel channel) throws IOException, InterruptedException {
        //打印数据
        String message = new String(msg.getBody(), StandardCharsets.UTF_8);
        log.info("队列消费消息{}"+message);
        channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
    }
}

配置式:

也可以创建SimpleMessageListenerContainer类并放入到Spring容器中。在创建SimpleMessageListenerContainer时,会配置监听的队列集合监听方法

@Slf4j
@Configuration
public class RabbitConfig {
    //这种配置也可以去监听队列的消息
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new MySimpleMessageListenerContainer(connectionFactory);
        //同时监听多个队列
        container.setQueues(new Queue("kinson2"));
        //设置当前的消费者数量
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(2);
        //设置是否重回队列
        container.setDefaultRequeueRejected(false);
        //设置自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置监听外露
        container.setExposeListenerChannel(true);
        //设置消息监听
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody(), "utf-8");
                log.info("队列2—消费消息:" + msg);
            }
        });
        return container;
    }
}
上一篇 下一篇

猜你喜欢

热点阅读