SpringBoot极简教程 · Spring Boot SpringBoot精选Springboot整合

使用RabbitMQ实现订单在30分钟后自动过期

2020-04-14  本文已影响0人  垃圾简书_吃枣药丸

延迟队列可以实现消息在投递到Exchange之后,经过一定的时间之后再投递到相应的Queue。再被消费者监听消费。
即:生产者投递的消息经过一段时间之后再被消费者消费。

该业务的其他实现方案:

使用RabbitMQ延迟队列实现:

# 安装延迟队列插件:

# 业务相关代码编写

OrderEntity image.png
/**
 * @author futao
 * @date 2020/4/14.
 */
public interface OrderMapper extends BaseMapper<Order> {
}
OrderController

# RabbitMQ相关代码编写

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: delay-vh
    connection-timeout: 15000
    # 发送确认
    publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 手动签收ACK
        acknowledge-mode: manual

app:
  rabbitmq:
    # 延迟时长设置
    delay:
      order: 10S
    # 队列定义
    queue:
      order:
        delay: order-delay-queue
    # 交换机定义
    exchange:
      order:
        delay: order-delay-exchange

/**
 * 队列,交换机定义与绑定
 * 延迟队列插件`rabbitmq-delayed-message-exchange`下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
 *
 * @author futao
 * @date 2020/4/10.
 */
@Configuration
public class Declare {

    /**
     * 订单队列 - 接收延迟投递的订单
     *
     * @param orderQueue 订单队列名称
     * @return
     */
    @Bean
    public Queue orderDelayQueue(@Value("${app.rabbitmq.queue.order.delay}") String orderQueue) {
        return QueueBuilder
                .durable(orderQueue)
                .build();
    }

    /**
     * 订单交换机-延迟交换机 - 消息延迟一定时间之后再投递到绑定的队列
     *
     * @param orderExchange 订单延迟交换机
     * @return
     */
    @Bean
    public Exchange orderDelayExchange(@Value("${app.rabbitmq.exchange.order.delay}") String orderExchange) {
        Map<String, Object> args = new HashMap<>(1);
        args.put("x-delayed-type", "topic");
        return new CustomExchange(orderExchange, "x-delayed-message", true, false, args);
    }

    /**
     * 订单队列-交换机 绑定
     *
     * @param orderQueue         订单队列
     * @param orderDelayExchange 订单交换机
     * @return
     */
    @Bean
    public Binding orderBinding(Queue orderDelayQueue, Exchange orderDelayExchange) {
        return BindingBuilder
                .bind(orderDelayQueue)
                .to(orderDelayExchange)
                .with("order.delay.*")
                .noargs();
    }
}

可以看出队列就是普通的队列。重点在交换机的设定上。声明延迟交换机需要设置参数x-delayed-type,值为交换机类型,可以是fanout,topic,direct。并且设置交换机的type为x-delayed-message

delay-exchange

可以看到,除了默认的交换机,SpringBoot已经帮我们创建好了延迟交换机order-delay-exchange,并且此时messages delayed为0,因为我们还未向交换机投递消息。

ExchangeDetail Orderender

在消息投递之前为每条消息都设置了延迟时长setDelay()
调用消费者的代码在上面OrderController中,下定之后,订单数据落库,并且向MQ中投递延迟消息。可以回头看看。

OrderConsumer

为了方便查看到延迟投递的效果,我在消息投递和接收处都打印了日志,测试时可以看到消息投递和消息的时间间隔。

# 测试

app:
  rabbitmq:
    delay:
      order: 10S
log DB100

可以看到,打印出了投递日志,订单主键为666ae86aabe2a1b3120b34bb5f447bbe的订单在2020-04-14 22:22:04.307进行了投递,此时数据库中该订单的状态为100,待支付。

ExchangeDetail log

可以看到OrderConsumer在10S后2020-04-14 22:22:14.320接收到了主键为666ae86aabe2a1b3120b34bb5f447bbe的订单消息。距离投递时间2020-04-14 22:22:04.307为10S。此时查看DB中订单状态:

DB

订单状态为200已过期,且过期时间为2020-04-14 22:22:14


app:
  rabbitmq:
    delay:
      order: 1M
Exchange log

消息都在延迟1分钟后投递到了队列-消费者。

/usr/local/Cellar/rabbitmq/3.7.15/plugins
建议收藏,当然我只是建议。


严重风险: 消息在消费者处阻塞,会导致后面的消息虽然已经过期,但是无法被及时消费。

上一篇下一篇

猜你喜欢

热点阅读