RabbitMQ延迟队列

2024-01-29  本文已影响0人  h2coder

应用场景

解决方案

安装RabbitMQ

下载RabbitMQ镜像

docker pull rabbitmq:3.8-management

安装RabbitMQ

docker run \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

下载插件

注意:插件的版本要和安装的RabbitMQ的版本相搭配,否则可能会有意外的问题产生!

安装插件

上传插件

docker volume inspect mq-plugins

安装插件

docker exec -it mq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

DelayExchange插件的原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

使用插件

声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true,然后声明队列与其绑定即可

声明DelayExchange交换机

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delayed.queue", durable = "true"),
    exchange = @Exchange(name = "delayed.direct",delayed = "true"),
    key = "delayed"
))
public void listenDelayedQueue(String msg){
    log.info("接收到 delayed.queue的延迟消息:{}", msg);
}
// 交换机和队列的配置
@Configuration
public class DelayedConfig {
    // 创建延迟交换机
    @Bean
    public DirectExchange delayedExchange() {
        return ExchangeBuilder.directExchange("delayed.direct")
                // 声明延迟属性
                .delayed()
                .build();
    }

    // 创建延迟队列
    @Bean
    public Queue delayedQueue() {
        return new Queue("delayed.queue");
    }

    // 绑定交换机和队列
    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed");
    }
}

// 监听器
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "delayed.queue")
    public void listenSimpleQueue(Message msg) throws Exception {
        log.info("接收到 delayed.queue的延迟消息:{}", msg);
    }
}

发送延迟消息

@GetMapping("/sendDelayed/{time}")
public ResponseEntity sendDelayed(@PathVariable("time") Integer time) {
    String exchange = "delayed.direct";
    Message message = MessageBuilder.withBody("delayed message".getBytes())
        // 设置延时时间,时间单位为毫秒值
        .setHeader("x-delay", time * 1000)
        .build();
    rabbitTemplate.send(exchange, "delayed", message);
    return ResponseEntity.ok("success" + new Date());
}

总结

上一篇 下一篇

猜你喜欢

热点阅读