RocketMQ 与 Spring Boot整合(四、消费重试)
RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
每条消息的失败重试,是有一定的间隔时间。实际上,消费重试是基于上一篇 定时消息 来实现,第一次重试消费按照延迟级别为 3 开始。所以,默认为 16 次重试消费,也非常好理解,毕竟延迟级别最高为 18 呀。
不过要注意,只有集群消费模式下,才有消息重试。
下面,我们开始本小节的示例。
一、快速开始
1.1 Demo04Message
package com.ebadagang.springboot.rocketmq.message;
/**
* 示例 04 的 Message 消息
*/
public class Demo04Message {
public static final String TOPIC = "DEMO_04";
/**
* 编号
*/
private Integer id;
public Demo04Message setId(Integer id) {
this.id = id;
return this;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "Demo04Message{" +
"id=" + id +
'}';
}
}
1.2 Demo04Producer
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.message.Demo04Message;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Demo04Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 创建 Demo04Message 消息
Demo04Message message = new Demo04Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo04Message.TOPIC, message);
}
}
1.3 Demo04Consumer
package com.ebadagang.springboot.rocketmq.consumer;
import com.ebadagang.springboot.rocketmq.message.Demo04Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = Demo04Message.TOPIC,
consumerGroup = "demo04-consumer-group-" + Demo04Message.TOPIC
)
public class Demo04Consumer implements RocketMQListener<Demo04Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo04Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <1>注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}
}
在<1>
处,我们在消费消息时候,抛出一个 RuntimeException 异常,模拟消费失败。
1.4 简单测试
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo04ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo04Producer producer;
@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
1.5执行 #testSyncSend() 方法
同步发送消息。控制台输出如下:
# Producer 同步发送消息成功
2020-08-04 17:34:14.543 INFO 4724 --- [ main] c.e.s.r.producer.Demo04ProducerTest : [testSyncSend][发送编号:[1596533654] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, offsetMsgId=6585E30D00002A9F0000000000037070, messageQueue=MessageQueue [topic=DEMO_04, brokerName=broker-a, queueId=2], queueOffset=0]]]
# Demo04Consumer 第一次消费失败,抛出 RuntimeException 异常
2020-08-04 17:34:15.195 INFO 4724 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo04Consumer : [onMessage][线程编号:172 消息内容:Demo04Message{id=1596533654}]
2020-08-04 17:34:15.210 WARN 4724 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [brokerName=broker-a, queueId=2, storeSize=308, queueOffset=0, sysFlag=0, bornTimestamp=1596533654475, bornHost=/202.99.106.26:39243, storeTimestamp=1596533654173, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000037070, commitLogOffset=225392, bodyCRC=1577825708, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1596533655179, id=cd33e825-f97e-c0ca-20cd-6fbfc58c0a19, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596533654241}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 51, 51, 54, 53, 52, 125], transactionId='null'}], error:{}
java.lang.RuntimeException: 我就是故意抛出一个异常
at com.ebadagang.springboot.rocketmq.consumer.Demo04Consumer.onMessage(Demo04Consumer.java:23) ~[classes/:na]
......
# Demo04Consumer 第一次重试消费失败,抛出 RuntimeException 异常。间隔了 10 秒,对应延迟级别 3 。
2020-08-04 17:34:25.271 INFO 4724 --- [MessageThread_2] c.e.s.rocketmq.consumer.Demo04Consumer : [onMessage][线程编号:179 消息内容:Demo04Message{id=1596533654}]
2020-08-04 17:34:25.271 WARN 4724 --- [MessageThread_2] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [brokerName=broker-a, queueId=0, storeSize=475, queueOffset=0, sysFlag=0, bornTimestamp=1596533654475, bornHost=/202.99.106.26:39243, storeTimestamp=1596533664900, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F00000000000378A3, commitLogOffset=227491, bodyCRC=1577825708, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={CONSUME_START_TIME=1596533665271, MIN_OFFSET=0, REAL_TOPIC=%RETRY%demo04-consumer-group-DEMO_04, ORIGIN_MESSAGE_ID=6585E30D00002A9F0000000000037070, RETRY_TOPIC=DEMO_04, MAX_OFFSET=1, id=cd33e825-f97e-c0ca-20cd-6fbfc58c0a19, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, DELAY=3, timestamp=1596533654241, REAL_QID=0}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 51, 51, 54, 53, 52, 125], transactionId='null'}], error:{}
java.lang.RuntimeException: 我就是故意抛出一个异常
at com.ebadagang.springboot.rocketmq.consumer.Demo04Consumer.onMessage(Demo04Consumer.java:23) ~[classes/:na]
......
# Demo04Consumer 第二次重试消费失败,抛出 RuntimeException 异常。间隔了 30 秒,对应延迟级别 4 。
2020-08-04 17:34:55.330 INFO 4724 --- [MessageThread_3] c.e.s.rocketmq.consumer.Demo04Consumer : [onMessage][线程编号:183 消息内容:Demo04Message{id=1596533654}]
2020-08-04 17:34:55.330 WARN 4724 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [brokerName=broker-a, queueId=0, storeSize=475, queueOffset=1, sysFlag=0, bornTimestamp=1596533654475, bornHost=/202.99.106.26:39243, storeTimestamp=1596533694969, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000037E95, commitLogOffset=229013, bodyCRC=1577825708, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={CONSUME_START_TIME=1596533695330, MIN_OFFSET=0, REAL_TOPIC=%RETRY%demo04-consumer-group-DEMO_04, ORIGIN_MESSAGE_ID=6585E30D00002A9F0000000000037070, RETRY_TOPIC=DEMO_04, MAX_OFFSET=2, id=cd33e825-f97e-c0ca-20cd-6fbfc58c0a19, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, DELAY=4, timestamp=1596533654241, REAL_QID=0}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 51, 51, 54, 53, 52, 125], transactionId='null'}], error:{}
java.lang.RuntimeException: 我就是故意抛出一个异常
at com.ebadagang.springboot.rocketmq.consumer.Demo04Consumer.onMessage(Demo04Consumer.java:23) ~[classes/:na]
......
从日志中,我们可以看到,消息因为消费失败后,又重试消费了多次。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。