RabbitMQ实现消息延迟推送

2020-09-21  本文已影响0人  砒霜拌辣椒

1、使用场景

在上面两种场景中,如果我们使用下面3种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

2、实现方式

RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列。这里不做介绍。

RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件rabbitmq-delayed-message-exchange

3、插件安装

  1. rabbitmq-delayed-message-exchange插件官方下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    选择对应版本下载(要和安装的RabbitMQ server版本匹配)。

  2. 找到RabbitMQ的安装路径,将下载的插件放到plugins目录中。比如:

-rw-r--r-- 1 root root   43377 9月  20 22:54 rabbitmq_delayed_message_exchange-3.8.0.ez
[root@libai plugins]# pwd
/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.14/plugins
  1. 启用插件
    使用rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令启用插件。
[root@libai plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@libai:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
 rabbitmq_delayed_message_exchange
 rabbitmq_management
 rabbitmq_management_agent
 rabbitmq_web_dispatch
Applying plugin configuration to rabbit@libai...
The following plugins have been enabled:
 rabbitmq_delayed_message_exchange
started 1 plugins.
  1. 查看管理界面


    延迟队列

查看交换机Type是否有x-delayed-message下拉选项,如果有则表示插件安装已经生效了。

4、应用

配置和依赖这里就不贴出了,可以参考以往Springboot整合RabbitMQ的文章。

4.1、队列和交换机绑定

@Configuration
public class DelayedMessageRabbitConfig {
    @Bean
    public Queue delayQueue() {
        return new Queue("delayQueue", true, false, false);
    }

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayedExchange", "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelay() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delayRouting").noargs();
    }
}

交换机类型为CustomExchange自定义类型,这里指定为x-delayed-message

4.2、生产者

@RestController
@Slf4j
public class DelayMessageController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sendDelayedMessage")
    public String sendDelayedMessage() {
        log.info(DateUtil.now());
        rabbitTemplate.convertAndSend("delayedExchange", "delayRouting", "订单取消", message -> {
            message.getMessageProperties().setDelay(5000);
            return message;
        });
        return "ok";
    }
}

这里指定延迟5秒推送消息。

4.3、消费者

@Component
@RabbitListener(queues = "delayQueue")
@Slf4j
public class DelayReceiver {
    @RabbitHandler
    public void process(String delayMessage) {
        log.info(DateUtil.now());
        log.info("延迟收到消息:{}", delayMessage);
    }
}

4.4、效果

2020-09-21 21:35:44
------------------
2020-09-21 21:35:49
延迟收到消息:订单取消

如果开启了消息确认机制,比如确认消息是否发到了交换机(publisher-confirms为true),则可能出现312、NO_ROUTE的提示,忽略即可。

另外这里的消息延迟主要发生在交换机延迟推送消息到队列中,而非队列延迟推送到消费者。

代码地址

上一篇下一篇

猜你喜欢

热点阅读