使用RabbitMQ实现订单在30分钟后自动过期
延迟队列可以实现消息在投递到
Exchange
之后,经过一定的时间之后再投递到相应的Queue
。再被消费者监听消费。
即:生产者投递的消息经过一段时间之后再被消费者消费。
- 业务场景:订单在30分钟内还未支付则自动取消。
该业务的其他实现方案:
- 使用
Redis
,设置过期时间,监听过期事件。 - 使用RabbitMQ的过期队列与死信队列,设置消息的存活时间,在设置的时间内未被消费,即会投递到死信队列,我们监听死信队列即可。可参考上一篇文章RabbitMQ死信队列在SpringBoot中的使用。
使用RabbitMQ延迟队列实现:
# 安装延迟队列插件:
- RabbitMQ插件列表: https://www.rabbitmq.com/community-plugins.html
- 延迟队列插件下载地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
-
安装并启用
plugin
- 重启RabbitMQ
# 业务相关代码编写
- 订单实体(仅保留相关字段)

- 订单状态枚举(仅保留相关状态)

- OrderMapper
/**
* @author futao
* @date 2020/4/14.
*/
public interface OrderMapper extends BaseMapper<Order> {
}
- 模拟下定的接口
OrderController
为了简单起见,省略了Service层.

# RabbitMQ相关代码编写
- 配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: futao
password: 123456789
virtual-host: delay-vh
connection-timeout: 15000
# 发送确认
publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
mandatory: true
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 手动签收ACK
acknowledge-mode: manual
app:
rabbitmq:
# 延迟时长设置
delay:
order: 10S
# 队列定义
queue:
order:
delay: order-delay-queue
# 交换机定义
exchange:
order:
delay: order-delay-exchange
- 延迟交换机,队列定义与绑定
/**
* 队列,交换机定义与绑定
* 延迟队列插件`rabbitmq-delayed-message-exchange`下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
*
* @author futao
* @date 2020/4/10.
*/
@Configuration
public class Declare {
/**
* 订单队列 - 接收延迟投递的订单
*
* @param orderQueue 订单队列名称
* @return
*/
@Bean
public Queue orderDelayQueue(@Value("${app.rabbitmq.queue.order.delay}") String orderQueue) {
return QueueBuilder
.durable(orderQueue)
.build();
}
/**
* 订单交换机-延迟交换机 - 消息延迟一定时间之后再投递到绑定的队列
*
* @param orderExchange 订单延迟交换机
* @return
*/
@Bean
public Exchange orderDelayExchange(@Value("${app.rabbitmq.exchange.order.delay}") String orderExchange) {
Map<String, Object> args = new HashMap<>(1);
args.put("x-delayed-type", "topic");
return new CustomExchange(orderExchange, "x-delayed-message", true, false, args);
}
/**
* 订单队列-交换机 绑定
*
* @param orderQueue 订单队列
* @param orderDelayExchange 订单交换机
* @return
*/
@Bean
public Binding orderBinding(Queue orderDelayQueue, Exchange orderDelayExchange) {
return BindingBuilder
.bind(orderDelayQueue)
.to(orderDelayExchange)
.with("order.delay.*")
.noargs();
}
}
可以看出队列就是普通的队列。重点在交换机的设定上。声明延迟交换机需要设置参数x-delayed-type
,值为交换机类型,可以是fanout
,topic
,direct
。并且设置交换机的type为x-delayed-message
。
- 定义完成后可以启动SpringBoot应用程序,在RabbitMQ管理后台查看Exchange和Queue。

可以看到,除了默认的交换机,SpringBoot已经帮我们创建好了延迟交换机order-delay-exchange
,并且此时messages delayed为0,因为我们还未向交换机投递消息。
- 可以继续查看交换机的路由类型与绑定的队列

-
队列为普通的队列
Queue
-
回到代码中,定义消息生产者

在消息投递之前为每条消息都设置了延迟时长setDelay()
。
调用消费者的代码在上面OrderController
中,下定之后,订单数据落库,并且向MQ中投递延迟消息。可以回头看看。
- 消费者-监听过期的订单信息,并且将DB中相应的订单设置为已过期。

为了方便查看到延迟投递的效果,我在消息投递和接收处都打印了日志,测试时可以看到消息投递和消息的时间间隔。
# 测试
- 把订单过期时长设置为10S
app:
rabbitmq:
delay:
order: 10S
- 下定


可以看到,打印出了投递日志,订单主键为666ae86aabe2a1b3120b34bb5f447bbe
的订单在2020-04-14 22:22:04.307
进行了投递,此时数据库中该订单的状态为100
,待支付。
- 此时查看Exchange详情可以发现,
messages delayed:1
,即目前有一条消息处于投递状态。

- 等待10S后。

可以看到OrderConsumer
在10S后2020-04-14 22:22:14.320
接收到了主键为666ae86aabe2a1b3120b34bb5f447bbe
的订单消息。距离投递时间2020-04-14 22:22:04.307
为10S。此时查看DB中订单状态:

订单状态为200
已过期,且过期时间为2020-04-14 22:22:14
- 达到了订单在我们指定的时间后过期。
- 再测试几条一分钟的场景
app:
rabbitmq:
delay:
order: 1M


消息都在延迟1分钟后投递到了队列-消费者。
/usr/local/Cellar/rabbitmq/3.7.15/plugins
建议收藏,当然我只是建议。
严重风险: 消息在消费者处阻塞,会导致后面的消息虽然已经过期,但是无法被及时消费。