【RabbitMQ学习-02】延迟消费问题
最近在学习RabbitMQ ,想尝试下延迟消息的问题。
总体思路,可能和网上的有点不一样,我是需要将消息先发送到ttl延迟队列内,
当消息到达过期时间后,会自动转发到ttl队列内配置的转发Exchange以及RouteKey绑定的正常队列内,完成消息消费。
1:TTL(Time To Live)
消息的过期时间有两种设置方式:
- 1: 通过队列属性设置消息过期时间
x-message-ttl ,该属性是在创建队列的时候 ,在arguments的map中配置;该参数的作用是设置当前队列中所有的消息的存活时间
@Bean("ttlQueue")
public Queue queue() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 10000); // 队列中的消息未被消费 10 秒后过期
return new Queue("TTL_QUEUE", true, false, false, map);
}
-
2: x-expires 该属性也是在arguments中配置;其作用是设置当前队列在N毫秒中(不能为0,且为正整数),就删除该队列;“未使用”意味着队列没有消费者,队列尚未重新声明,并且至少在有效期内未调用basicGet (basicGet 是手动拉取指定队列中的一条消息)
-
3: 基于单条消息设置过期时间
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("4000"); // 消息的过期属性,单位 ms
Message message = new Message("这条消息 4 秒后过期".getBytes(), messageProperties);
rabbitTemplate.send("TTL_EXCHANGE", "test.ttl", message);
如果同时指定了Message TTL 和Queue TTL,则优先较小的那一个。
2:什么情况下消息会变成死信?
- 1.消息被消费者拒绝并且未设置重回队列
- 2.消息过期
- 3.队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX。
3:配置队列
- 1:配置第一个正常队列。
// 配置第一个交换器
@Bean
DirectExchange msgDirectExchange(){
Exchange build = ExchangeBuilder.directExchange(QueueEnum.MSG_QUEUE.getExchangeName()).durable(true).build();
return (DirectExchange) build;
}
// 配置第一个消息队列
@Bean
public Queue msgQeue(){
return new Queue(QueueEnum.MSG_QUEUE.getQueueName());
}
// 配置交换器和 路由绑定 以及消息队列绑定
@Bean
public Binding msgBindng(){
return BindingBuilder
.bind(msgQeue())
.to(msgDirectExchange())
.with(QueueEnum.MSG_QUEUE.getRouteKey());
}
- 2:配置TTL队列和交换器
// 配置TTL交换器
@Bean
DirectExchange ttlDirectExchange(){
Exchange build = ExchangeBuilder
.directExchange(QueueEnum.MSG_TTL_QUEUE.getExchangeName())
.durable(true)
.build();
return (DirectExchange) build;
}
// 配置TTL消息队列
@Bean
public Queue ttlQueue(){
return QueueBuilder
.durable(QueueEnum.MSG_TTL_QUEUE.getQueueName())
// 配置到期后转发的交换 x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
.withArgument("x-dead-letter-exchange",QueueEnum.MSG_QUEUE.getExchangeName())
// 配置到期后转发到那个路由建上 x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
.withArgument("x-dead-letter-routing-key",QueueEnum.MSG_QUEUE.getRouteKey())
.build();
}
// 配置TTL交换器 路由key 以及消息队列 整合
@Bean
public Binding ttlBinding(){
return BindingBuilder
.bind(ttlQueue())
.to(ttlDirectExchange())
.with(QueueEnum.MSG_TTL_QUEUE.getRouteKey());
}
重点在于:
x-dead-letter-exchange、x-dead-letter-routing-key两个参数,而这两个参数就是配置队列过期后转发的Exchange、RouteKey。简单点,就是死信属性,就是配置消息到期后,转到哪一个交换器上绑定了那个路由。
死信交换(Dead Letter Exchanges 简称 DLX)
当出现"死信"的情况下 rabbitmq 可以对该"死信"进行交换到别的队列上,但是交换的前提是需要为死信配置一个交换机用于死信的交换
4:配置生产者消费者代码
- 1:发送到队列的代码
public void send(Object messageContent, String exchange, String routerKey, final long delayTimes){
/**
* Convert a Java object to an Amqp {@link Message} and send it to a specific exchange with a specific routing key.
*
* @param exchange the name of the exchange
* @param routingKey the routing key
* @param message a message to send
* @param messagePostProcessor a processor to apply to the message before it is sent
* @throws AmqpException if there is a problem
*/
if(!StringUtils.isEmpty(exchange)){
logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
rabbitMqTemplate.convertAndSend(exchange,routerKey,messageContent,message -> {
/**
* 这里是单独的设置了某一条消息的过期时间,而不是整个队列都设置了。
*/
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
}else{
logger.error("未找到队列消息:{},所属的交换机", exchange);
}
}
- 2: 消费消息的代码
本次只是为了验证消费,所以没有重试,以及确认等等
此时,监听的队列是:正常的消息队列 QueueEnum.MSG_QUEUE 而不是 延迟队列
@Component
@RabbitListener(queues ="msg02.queue")
public class MsgConsumer {
private Logger logger= LoggerFactory.getLogger(MsgConsumer.class);
@RabbitHandler
public void handler(String content) {
logger.info("消费内容:{}", content);
}
}
- 3: 测试代码
@SpringBootTest(classes = RabbitMQApp.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class RabbitmqTest {
@Autowired
MsgSend msgSend;
@Test
public void sendTest(){
msgSend.send("测试延迟消费15秒,写入时间:" + new Date(),
QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
15*1000);
msgSend.send("测试延迟消费10秒,写入时间:" + new Date(),
QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
10*1000);
}
}
BUG问题出现了
- 1:当我把10秒放在前面 15秒放在后面。
@Test
public void sendTest(){
msgSend.send("测试延迟消费10秒,写入时间:" + new Date(),
QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
10*1000);
msgSend.send("测试延迟消费15秒,写入时间:" + new Date(),
QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
15*1000);
}
如图所示的时候:
结果是完全符合预期的:
2019-10-10 21:52:26.606 INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer : 消费内容:测试延迟消费10秒,写入时间:Thu Oct 10 21:52:16 CST 2019
2019-10-10 21:52:31.605 INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer : 消费内容:测试延迟消费15秒,写入时间:Thu Oct 10 21:52:16 CST 2019
可以看到,同一秒写入,但是间隔5秒处理消息。就是说消息队列头部的是10秒。第二个是15秒的时候正常处理
- 2:当我把15秒放在前面,10秒放在后面的时候。
@Test
public void sendTest(){
msgSend.send("测试延迟消费15秒,写入时间:" + new Date(),
QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
15*1000);
msgSend.send("测试延迟消费10秒,写入时间:" + new Date(),
QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
10*1000);
}
出现BUG了。两个消息都是15秒的时候消费。
2019-10-10 21:53:09.176 INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer : 消费内容:测试延迟消费15秒,写入时间:Thu Oct 10 21:52:54 CST 2019
2019-10-10 21:53:09.181 INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer : 消费内容:测试延迟消费10秒,写入时间:Thu Oct 10 21:52:54 CST 2019
可以看到,他们都是 21:52:54 写入到消息队列中,但是 消费的时间却都是21:53:09 相隔了15秒。
就是说现在放在队尾的10秒的数据,阻塞住了,必须等对头的消息处理完毕后,才能进行消费。
不知道怎么回事啊?????
总结:(参考 https://blog.csdn.net/u011212394/article/details/100086728)
RabbitMQ 本身不支持延迟队列,总的来说有三种实现方案:
1.先存储到数据库,用定时任务扫描。
2.利用 RabbitMQ的死信队列(Dead Letter Queue)实现。
主要过程:
生产者 —> 原交换机 —> 原队列(超过 TTL 之后) —> 死信交换机 —> 死信队列 —> 最终消费者
使用死信队列实现延时消息的缺点:
(1)如果统一用队列来设置消息的 TTL,当梯度非常多的情况下,比如 1 分钟,2 分钟,5 分钟,10 分钟,20 分钟,30 分钟......需要创建很多交换机和队列来路由消息。
(2)如果单独设置消息的 TTL,则可能会造成队列中的消息阻塞,即前一条消息没有出队(没有被消费),后面的消息无法投递。比如第一条消息过期 TTL 是 30min,第二条消息 TTL 是 10min。10 分钟后,即使第二条消息应该投递了,但是由于第一条消息 还未出队,所以无法投递。
(3)可能存在一定时间误差