RabbitMQ实现延迟消费(延迟队列)
2020-11-15 本文已影响0人
程序员小杰
-
什么是延迟队列
- 延迟队列存储的对象是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
-
RabbitMQ如何实现延迟队列?
- AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过DLX和TTL 模拟出延迟队列的功能。
-
实现延迟队列
- 我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
- 生产者发送消息到Queue1,并且这个消息是设置有有效时间的,比如60s。消息会在Queue1中等待60s,如果没有消费者消费的话,它会被转发到Queue2,Queue2有消费者,处理延迟消息。
注:Queue1是没有消费者的
实现
首先在pom中加入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
然后配置yml文件
spring:
application:
name: ttl-queue
rabbitmq:
host: 47.105.198.54
port: 5672
virtual-host: /test-1
username: 11
password: 111
mq:
queueBinding:
queue: prod_queue_pay
dlQueue: dl-queue
exchange:
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
type: topic
key: prod_pay
dlRoutingKey: dl-routing-key
新建配置类
创建业务队列与死信队列
@Configuration
public class RabbitConfig {
//业务队列配置
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeMame;
@Value("${mq.queueBinding.key}")
private String key;
//死信队列配置
@Value("${mq.queueBinding.exchange.dlTopicExchange}")
private String dlTopicExchange;
@Value("${mq.queueBinding.dlRoutingKey}")
private String dlRoutingKey;
@Value("${mq.queueBinding.dlQueue}")
private String dlQueue;
//创建死信交换机
@Bean
public TopicExchange dlTopicExchange(){
return new TopicExchange(dlTopicExchange,true,false);
}
//创建死信队列
@Bean
public Queue dlQueue(){
return new Queue(dlQueue,true);
}
//死信队列与死信交换机进行绑定
@Bean
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
private final String dle = "x-dead-letter-exchange";
private final String dlk = "x-dead-letter-routing-key";
private final String ttl = "x-message-ttl";
//创建业务队列
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//设置队列的过期时间
params.put(ttl,10000);
//声明当前队列绑定的死信交换机
params.put(dle,dlTopicExchange);
//声明当前队列的死信路由键
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
//创建业务交换机
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(exchangeMame,true,false);
}
//业务队列与业务交换机进行绑定
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
}
}
创建生产者
/*
* 生产者
*/
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
rabbitTemplate.convertAndSend(exchangeName,key,msg);
}
}
创建消费者
该消费者是消费死信队列中的消息
/**
* 消费者
*/
@Component
@Slf4j
public class RabbitReceiver {
//消费死信队列的消息
@RabbitListener(queues = "${mq.queueBinding.dlQueue}")
public void infoConsumption(String data) throws Exception {
log.info("收到信息:{}",data);
log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
}
}
接口
@RestController
public class TestController {
@Autowired
private RabbitSender rabbitSender;
@GetMapping
public void test(@RequestParam String msg){
rabbitSender.send(msg);
}
}
启动服务之后,可以看到创建的交换机和队列
image.png
然后进行接口调用:http://localhost:8080/?msg=哈哈哈哈哈哈
2020-11-15 00:33:02.991 RabbitSender.send() msg = 哈哈哈哈哈哈
2020-11-15 00:33:12.060 收到信息:哈哈哈哈哈哈
2020-11-15 00:33:12.060 然后进行一系列逻辑处理 Thanks♪(・ω・)ノ
消息过期之后从prod_queue_pay
队列转发到dl-queue
队列。很好的实现了消息延迟消费。但我们会发现一个问题,通过给队列属性设置过期时间,如果我现在有不同的场景,比如我5s、10s、15s之后延迟消费,那需要创建三个队列。每次有一个不同的时间段的需求过来,我都需要创建一个队列,这肯定不行。
RabbitMQ插件实现延迟队列
快速入口:https://www.jianshu.com/p/78354a3e35d0
创建队列和交换机
@Configuration
public class RabbitConfig2 {
private static final String EXCHANGE_NAME = "delayed_exchange";
private static final String QUEUE_NAME = "delayed_queue";
private static final String ROUTE_KEY = "delayed_key";
/**
* 交换机
*/
@Bean
CustomExchange exchange() {
//通过x-delayed-type参数设置fanout /direct / topic / header 类型
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic");
return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args);
}
/**
* 队列
*/
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME,true,false,false);
}
/**
* 将队列绑定到交换机
*/
@Bean
public Binding binding(CustomExchange exchange,Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTE_KEY)
.noargs();
}
}
创建生产者
发送消息的时候通过在header添加"x-delay"参数来控制消息的延时时间
/*
* 生产者
*/
@Component
@Slf4j
public class RabbitSender {
private static final String ROUTE_KEY = "delayed_key";
private static final String EXCHANGE_NAME = "delayed_exchange";
/**
* @param msg 消息
* @param delay 延时时间,秒
*/
public void send2(String msg,int delay){
log.info("RabbitSender.send() msg = {}",msg);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message ->{
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //消息持久化
message.getMessageProperties().setDelay(delay * 1000); // 单位为毫秒
return message;
});
}
}
点进setDelay方法
public void setDelay(Integer delay) {
if (delay != null && delay >= 0) {
this.headers.put("x-delay", delay);
} else {
this.headers.remove("x-delay");
}
}
创建消费者
/**
* 消费者
*/
@Component
@Slf4j
public class RabbitReceiver {
@RabbitListener(queues = "delayed_queue")
public void infoConsumption(String data) throws Exception {
log.info("收到信息:{}",data);
log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
}
}
对外方法
@RestController
public class TestController {
@Autowired
private RabbitSender rabbitSender;
@GetMapping("/test2/{msg}/{delay}")
public void test2(@PathVariable("msg") String msg, @PathVariable("delay")int delay){
rabbitSender.send2(msg,delay);
}
}
启动服务,登录RabbitMQ管理界面,可以看到交换机和队列都已经创建成功。
image.png
image.png
然后分别发送延迟消费时间为60s、30s、5s的消息。查看消费者的消费记录。
请求1:http://localhost:8080/test2/msg=发送时间为60s的过期时间/60
请求2:http://localhost:8080/test2/msg=发送时间为30s的过期时间/30
请求3:http://localhost:8080/test2/msg=发送时间为5s的过期时间/5
通过打印的日志可以发现:依次发送60s、30s、5s。但消费的顺序为5s、30s、60s。
2020-11-15 16:13:58.783 RabbitSender.send() msg = msg=发送时间为60s的过期时间
2020-11-15 16:14:02.653 RabbitSender.send() msg = msg=发送时间为30s的过期时间
2020-11-15 16:14:08.880 RabbitSender.send() msg = msg=发送时间为5s的过期时间
2020-11-15 16:14:13.924 收到信息:msg=发送时间为5s的过期时间
2020-11-15 16:14:13.925 然后进行一系列逻辑处理 Thanks♪(・ω・)ノ
2020-11-15 16:14:32.685 收到信息:msg=发送时间为30s的过期时间
2020-11-15 16:14:32.687 然后进行一系列逻辑处理 Thanks♪(・ω・)ノ
2020-11-15 16:14:58.814 收到信息:msg=发送时间为60s的过期时间
2020-11-15 16:14:58.814 然后进行一系列逻辑处理 Thanks♪(・ω・)ノ
该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。这些判断和操作导致效率不如普通的Exchange,所以如果不需要的话,就不要用插件类型的延迟队列。