MQ

RocketMQ 与 Spring Boot整合(四、消费重试)

2020-08-18  本文已影响0人  梅西爱骑车

RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

每条消息的失败重试,是有一定的间隔时间。实际上,消费重试是基于上一篇 定时消息 来实现,第一次重试消费按照延迟级别为 3 开始。所以,默认为 16 次重试消费,也非常好理解,毕竟延迟级别最高为 18 呀。

不过要注意,只有集群消费模式下,才有消息重试。

下面,我们开始本小节的示例。

一、快速开始

1.1 Demo04Message

package com.ebadagang.springboot.rocketmq.message;

/**
 * 示例 04 的 Message 消息
 */
public class Demo04Message {

    public static final String TOPIC = "DEMO_04";

    /**
     * 编号
     */
    private Integer id;

    public Demo04Message setId(Integer id) {
        this.id = id;
        return this;
    }

    public Integer getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Demo04Message{" +
                "id=" + id +
                '}';
    }

}

1.2 Demo04Producer

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.message.Demo04Message;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Demo04Producer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult syncSend(Integer id) {
        // 创建 Demo04Message 消息
        Demo04Message message = new Demo04Message();
        message.setId(id);
        // 同步发送消息
        return rocketMQTemplate.syncSend(Demo04Message.TOPIC, message);
    }

}

1.3 Demo04Consumer

package com.ebadagang.springboot.rocketmq.consumer;

import com.ebadagang.springboot.rocketmq.message.Demo04Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        topic = Demo04Message.TOPIC,
        consumerGroup = "demo04-consumer-group-" + Demo04Message.TOPIC
)
public class Demo04Consumer implements RocketMQListener<Demo04Message> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(Demo04Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
        // <1>注意,此处抛出一个 RuntimeException 异常,模拟消费失败
        throw new RuntimeException("我就是故意抛出一个异常");
    }

}

<1>处,我们在消费消息时候,抛出一个 RuntimeException 异常,模拟消费失败。

1.4 简单测试

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo04ProducerTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private Demo04Producer producer;

    @Test
    public void testSyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        SendResult result = producer.syncSend(id);
        logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

}

1.5执行 #testSyncSend() 方法

同步发送消息。控制台输出如下:

# Producer 同步发送消息成功
2020-08-04 17:34:14.543  INFO 4724 --- [           main] c.e.s.r.producer.Demo04ProducerTest      : [testSyncSend][发送编号:[1596533654] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, offsetMsgId=6585E30D00002A9F0000000000037070, messageQueue=MessageQueue [topic=DEMO_04, brokerName=broker-a, queueId=2], queueOffset=0]]]

# Demo04Consumer 第一次消费失败,抛出 RuntimeException 异常
2020-08-04 17:34:15.195  INFO 4724 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo04Consumer   : [onMessage][线程编号:172 消息内容:Demo04Message{id=1596533654}]
2020-08-04 17:34:15.210  WARN 4724 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [brokerName=broker-a, queueId=2, storeSize=308, queueOffset=0, sysFlag=0, bornTimestamp=1596533654475, bornHost=/202.99.106.26:39243, storeTimestamp=1596533654173, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000037070, commitLogOffset=225392, bodyCRC=1577825708, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1596533655179, id=cd33e825-f97e-c0ca-20cd-6fbfc58c0a19, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596533654241}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 51, 51, 54, 53, 52, 125], transactionId='null'}], error:{}

java.lang.RuntimeException: 我就是故意抛出一个异常
    at com.ebadagang.springboot.rocketmq.consumer.Demo04Consumer.onMessage(Demo04Consumer.java:23) ~[classes/:na]
    ......
    
# Demo04Consumer 第一次重试消费失败,抛出 RuntimeException 异常。间隔了 10 秒,对应延迟级别 3 。
2020-08-04 17:34:25.271  INFO 4724 --- [MessageThread_2] c.e.s.rocketmq.consumer.Demo04Consumer   : [onMessage][线程编号:179 消息内容:Demo04Message{id=1596533654}]
2020-08-04 17:34:25.271  WARN 4724 --- [MessageThread_2] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [brokerName=broker-a, queueId=0, storeSize=475, queueOffset=0, sysFlag=0, bornTimestamp=1596533654475, bornHost=/202.99.106.26:39243, storeTimestamp=1596533664900, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F00000000000378A3, commitLogOffset=227491, bodyCRC=1577825708, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={CONSUME_START_TIME=1596533665271, MIN_OFFSET=0, REAL_TOPIC=%RETRY%demo04-consumer-group-DEMO_04, ORIGIN_MESSAGE_ID=6585E30D00002A9F0000000000037070, RETRY_TOPIC=DEMO_04, MAX_OFFSET=1, id=cd33e825-f97e-c0ca-20cd-6fbfc58c0a19, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, DELAY=3, timestamp=1596533654241, REAL_QID=0}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 51, 51, 54, 53, 52, 125], transactionId='null'}], error:{}

java.lang.RuntimeException: 我就是故意抛出一个异常
    at com.ebadagang.springboot.rocketmq.consumer.Demo04Consumer.onMessage(Demo04Consumer.java:23) ~[classes/:na]
    ......
    
# Demo04Consumer 第二次重试消费失败,抛出 RuntimeException 异常。间隔了 30 秒,对应延迟级别 4 。
2020-08-04 17:34:55.330  INFO 4724 --- [MessageThread_3] c.e.s.rocketmq.consumer.Demo04Consumer   : [onMessage][线程编号:183 消息内容:Demo04Message{id=1596533654}]
2020-08-04 17:34:55.330  WARN 4724 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [brokerName=broker-a, queueId=0, storeSize=475, queueOffset=1, sysFlag=0, bornTimestamp=1596533654475, bornHost=/202.99.106.26:39243, storeTimestamp=1596533694969, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000037E95, commitLogOffset=229013, bodyCRC=1577825708, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={CONSUME_START_TIME=1596533695330, MIN_OFFSET=0, REAL_TOPIC=%RETRY%demo04-consumer-group-DEMO_04, ORIGIN_MESSAGE_ID=6585E30D00002A9F0000000000037070, RETRY_TOPIC=DEMO_04, MAX_OFFSET=2, id=cd33e825-f97e-c0ca-20cd-6fbfc58c0a19, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A127418B4AAC21338438C0000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, DELAY=4, timestamp=1596533654241, REAL_QID=0}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 51, 51, 54, 53, 52, 125], transactionId='null'}], error:{}

java.lang.RuntimeException: 我就是故意抛出一个异常
    at com.ebadagang.springboot.rocketmq.consumer.Demo04Consumer.onMessage(Demo04Consumer.java:23) ~[classes/:na]
    ......

从日志中,我们可以看到,消息因为消费失败后,又重试消费了多次。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

上一篇 下一篇

猜你喜欢

热点阅读