RabbitMq 延迟队列

2021-03-08  本文已影响0人  子丿龙

一、延迟队列

延迟队列就是比普通队列多了一个延迟属性。
单从字面意思,可以理解为这个队列是延迟的,但我们普遍默认的,是我们要实现的功能是延迟,而和队列本身是延迟队列或不是延迟队列无关。看完本篇文章就会明白,即使队列是普通队列,单给消息设置一个延迟时间,依然可以实现延迟功能。

注:不给队列加延迟属性,单给消息设置一个延迟时间,需要延迟插件才能实现

二、RabbitMQ中的 TTL 属性

TTL(Time To Live):表示消息的存活时间(毫秒)
如果一个队列或者一条消息设置了TTL,那么如果消息没有被及时处理,则会变为死信。

有两种设置TTL方式付下:
 @Bean("delayQueueA")
    public Queue delayQueueA() {
        return QueueBuilder.durable(DELAY_QUEUEA_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY)
                .ttl(6000)//设置队列TTL 6秒(底层就是设置 " x-message-ttl ")
                .build();
 @GetMapping("/delayMsg")
    public void sendMsg2(String msg, Integer delayTime) {
        log.info(delayTime/1000 + "s");
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEC_ROUTING_KEY, 
                msg+delayTime, message -> {
            //单个消息设置TTL 
            message.getMessageProperties().setExpiration(delayTime.toString());
            return message;
        });
    }

注意:如果两种方式都设置了,那么选TTL最短时间

有两种方式可实现延迟功能:

三、第一种延迟功能实现方式代码示例,给队列设置延迟

先看看消息流向图:


image.png
这种方式,就是利用死信机制,先让消息流向设置了延迟时间的 延迟队列,待消息到期后成为死信,便自动流向 死信队列,最后我们监听 死信队列 的消息,然后消费。(延迟队列、死信队列和正常队列是一样的,正常监听消费即可)
package com.zilong.mqpractice.delayqueue;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQConfig {
    public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    public static final String DELAY_QUEUEA_NAME = "delay.queuea";
    public static final String DELAY_QUEUEB_NAME = "delay.queueb";
    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queuea.routingkey";
    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queueb.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_NAME = "deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "deadletter.queueb";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "deadletter.delay_6s.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "deadletter.delay_30s.routingkey";

    @Bean("delayExchange")
    public DirectExchange delayExchange() {
        log.info("create----> delayExchange");
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }


    @Bean("delayQueueA")
    public Queue delayQueueA() {
        return QueueBuilder.durable(DELAY_QUEUEA_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY)
                .ttl(6000)
                .build();
    }

    @Bean("delayQueueB")
    public Queue delayQueueB() {
        return QueueBuilder.durable(DELAY_QUEUEB_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEB_ROUTING_KEY)
                .ttl(30000)
                .build();
    }

    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);

    }

    @Bean
    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);

    }


    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUEA_NAME)
                .build();
    }

    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUEB_NAME)
                .build();
    }

    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);

    }

    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);

    }

}

package com.zilong.mqpractice.delayqueue;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/delayMsg")
public class Cotroller {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/6s/send")
    public void sendMsg(String msg) {
        log.info("6s");
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
    }

    @GetMapping("/30s/send")
    public void sendMsg2(String msg) {
        log.info("30s");
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
    }
}

package com.zilong.mqpractice.delayqueue;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class Consumer {


    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("收到6s,死信A: " + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("收到30s,死信B: " + msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

2021-03-08 23:02:11.711  INFO 83451 --- [nio-8006-exec-3] c.z.mqpractice.delayqueue.Cotroller      : 6s
2021-03-08 23:02:13.378  INFO 83451 --- [nio-8006-exec-4] c.z.mqpractice.delayqueue.Cotroller      : 30s
2021-03-08 23:02:17.845  INFO 83451 --- [ntContainer#3-1] c.zilong.mqpractice.delayqueue.Consumer  : 收到6s,死信A: 短时间
2021-03-08 23:02:43.548  INFO 83451 --- [ntContainer#2-1] c.zilong.mqpractice.delayqueue.Consumer  : 收到30s,死信B: 长时间

四、第二种延迟功能实现方式代码示例,给消息设置延迟

这种方式是最简便的方式,不需要第一种那么复杂的消息流向
先看消息就想图:


image.png
利用rabbitmq延迟插件,创建一个延迟交换机,然后消息生产时设置上延迟属性(setDelay()方法,而不是setExpiration()方法),队列也用普通队列即可,不需要设置延迟属性,消费者只需要监听该队列即可。
package com.zilong.mqpractice.delayqueue2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQConfig3 {
    public static final String DELAY_EXCHANGE_NAME = "delay.exchange2";
    public static final String DELAY_QUEUEC_NAME = "delay.queuec";
    public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queuec.routingkey";

    @Bean("delayExchange2")
    public DirectExchange delayExchange2() {
        log.info("create----> delayExchange2");
        DirectExchange directExchange = new DirectExchange(DELAY_EXCHANGE_NAME);
        //给交换机设置延迟属性
        directExchange.setDelayed(true);
        return directExchange;
    }

    @Bean("delayQueueC")
    public Queue delayQueueC() {
        //普通的队列
        return QueueBuilder.durable(DELAY_QUEUEC_NAME)
                .build();
    }

    @Bean
    public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                 @Qualifier("delayExchange2") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);

    }


}

package com.zilong.mqpractice.delayqueue2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/delayMsg2")
public class Cotroller3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public void sendMsg(String msg, Integer delayTime) {
        log.info(delayTime / 1000 + "s");
        rabbitTemplate.convertAndSend(RabbitMQConfig3.DELAY_EXCHANGE_NAME, RabbitMQConfig3.DELAY_QUEUEC_ROUTING_KEY,
                msg + delayTime, message -> {
                    //给消息设置延迟属性,不同于setExpiration()
                    message.getMessageProperties().setDelay(delayTime);
                    return message;
                });
    }
}


package com.zilong.mqpractice.delayqueue2;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class Consumer3 {


    @RabbitListener(queues = RabbitMQConfig3.DELAY_QUEUEC_NAME)
    public void receivec(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("收到死信C: " + msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

2021-03-08 23:30:12.225  INFO 96631 --- [nio-8006-exec-1] c.z.mqpractice.delayqueue2.Cotroller3    : 6s
2021-03-08 23:30:16.317  INFO 96631 --- [nio-8006-exec-2] c.z.mqpractice.delayqueue2.Cotroller3    : 30s
2021-03-08 23:30:18.368  INFO 96631 --- [ntContainer#4-1] c.z.mqpractice.delayqueue2.Consumer3     : 收到死信C: 插件延迟消息6000
2021-03-08 23:30:46.417  INFO 96631 --- [ntContainer#4-1] c.z.mqpractice.delayqueue2.Consumer3     : 收到死信C: 插件延迟消息30000

五、小结

实现延迟功能,就两种方式。
1.将队列设置为延迟队列。缺点是所有消息都固定的延迟时间,想要改变,就要重建一个延迟队列
2.利用延迟插件给消息设置延迟时间。缺点是插件会影响效率(没试过,官网说的)

疑问:用延迟队列,然后用setExpiration()给消息设置延迟时间可以么?

不可以,队列特点是先进先出,这样做,虽然有延迟时间,但是rabbitmq会按顺序检测信息是否死亡。例如:
先发送messageA:延迟60s,然后发送messageB:延迟6s,那么rabbitmq会先检测messageA是否变为死信,如果messageA变为死信,rabbitmq会将其丢到死信队列,然后rabbitmq才去检测messageB是否是死信.导致的结果就是,messageB会等待messageA60s被消费后,自己才能被检测到,然后被消费(messageB自己的延迟时间已经倒计时完毕,不会等messageA 60s过后才开始倒计时,也就是messageB不会等到66s,他已经是死信,只是rabbitmq没检测它)。所以,给消息设置延迟时间,还是需要按照官网方式,用延迟插件实现。

上一篇下一篇

猜你喜欢

热点阅读