RocketMQ源码解析(十二)-顺序消息
什么是顺序消息
顺序消息是RocketMQ相对于其它分布式消息队列方案一个很有特色的特性。所谓顺序就是consumer端用户Listener收到消息的顺序和这些消息的发送的顺序是一致的,也就是Producer先发的消息先收到。这个一致不是指所有消息都是按照顺序排着队到达consumer的,而是由用户指定哪些消息之间有顺序关系,只有有顺序关系的才是按序处理。
举个例子,一笔订单流程可能会有很多次状态变化,每次状态变化都可以通过消息通知其它相关服务,比如订单A在拍下、付款、发货时分别发1条消息m1,m2,m3
。对于接收端来说,肯定是希望按照顺序收到,如果使用普通消息是保证不了这一点的。而使用顺序消息只要发送端指定这3条消息是由先后顺序的,consumer收到的时候就可以先收到m1,在m1消费成功之前,m2是不会被处理的。如果这时候还有一个订单B,也同时发送了m4,m5,m6
消息,显然对于consumer来说,A和B的消息是没必要有先后顺序的,所以在发送时只需要指定4,5,6之间有顺序关系就好了,这样订单A和B的消息可以同时被处理。
消息保证顺序面临的问题
实现一个先进先出(FIFO)的队列听起来好像比较简单,但是放到分布式队列里,实现起来是有点复杂的,尤其是不可能为了一个顺序消息的功能而破坏了原来的架构。我们先看一下有哪些问题:
1)根据前面讲过的RocketMQ的架构,一条消息从Producer发到Broker的过程其实是有发送到一个Broker的集群,消息会分布到多个Broker的多个Queue上面。即使发送的时候是一条一条按顺序发的,也保证不了消息到达Broker的时间也是按照发送的顺序来的。
2)Broker之间是没有数据交互的,也就是说Broker A收到一条Producer提交的消息,它并不知道之前那条消息被发到了哪个Broker的哪个Queue上,更别提知道之前那条消息是否消费成功了,所以依赖broker来控制消息的顺序是很困难的。
3)为了提高并发能力,同一个Group下会有多个Consumer,每个consumer消费一部分queue的消息。所以,如果两条有顺序关系的消息分布在两个queue上,就有可能被push到两个consumer上,而consumer之间也没有数据交互,依赖consumer做排序也是很难实现的。
针对以上面临的问题,我们看下RocketMQ是怎么解决的。
RocketMQ顺序消息的实现原理
RocketMQ在针对顺序消息的实现,大部分逻辑依赖客户端,也就是Producer和Consumer。Broker在整个流程中不会感知到顺序消息的存在。
针对问题一,既然分散到多个broker上无法追踪顺序,RocketMQ的做法是有顺序关系的消息都发送到同一个queue上,自然他们也会存到同一个broker上。根据之前讲的broker消息的存储逻辑,同一个queue的消息,先到的肯定放在前面,所以只要客户端在发送的时候使用单线程,发完一条再发另一条,消息在broker上保存的顺序自然也是按发送的顺序。
针对问题二、既然Broker不知道消息的状态,那就把保证顺序这件事交给Consumer,因为第一步中有顺序关系的消息已经在同一个queue里了,consumer拿消息的时候本来也是按照存的顺序来的,所以Broker不需要做任何特殊逻辑。
针对问题三、Consumer做了如下几件事保证消息按顺序处理:
- 一个topic下的消息会被同一个group下的多个cosumer按queue瓜分消费。而对于顺序消息,同一个queue只允许一个consumer消费,consumer在启动后会尝试到broker获取指定queue的锁,只有持有queue锁的consumer才能消费这个queue的消息。
- 在普通消息模式下,消息到达consumer后回被放进缓存队列中,然后会有多个线程同时处理队列中的消息。而对于顺序消息,consumer增加了互斥锁,同一时间同一个queue只会有一个线程在处理。
- 普通消息会有两种情况导致消息重新返还给Broker重新投递,一种是消息在consumer的缓存中等待时间过长,还有一种就是用户代码逻辑中处理失败。而对于顺序消息,只要到达consumer会一直尝试消费,直到超过最大次数,才会返给broker,这时候broker不会再重新投递了。而且顺序消息也不会因为超时而被返还给broker。
通过以上的逻辑,RocketMQ只用了很少的代码量实现了顺序消息,应该说非常巧妙。当然有得必有失,顺序消息也存在一定的问题。
顺序消息存在的问题
- 由于需要有顺序关系的消息发送到同一个queue中,而不是使用客户端自带的负载均衡策略,所以一旦量比较大,可能会造成这个队列消息量很大,而其它队列比较空闲的情况。
- 顺序消息处理也必须在同一个consumer上,而且同一个queue的消息只能单线程处理,也存在消息堆积的可能。
- 如果业务处理消息失败,只会在consumer端重试,到达重试次数之后。会直接放入broker中的死信队列。
- 顺序消息是否100%保证消息的顺序呢?答案是否定的。有消息
m1,m2,m3
需要顺序处理,m1被发到q1中,这时候q1所在的broker宕机,Producer会另外选择一个queue来投递m2和m3,这个时候m1和m2会到达不同的consumer上。当然这种情况发生的概率是非常低的,因为producer从检测到broker宕机到切换queue需要一段时间,同时consumer要有消息堆积才会造成这种现象的出现。
有以上的问题我们可以得出结论,顺序消息在保证顺序的同时会放弃吞吐和一定的可用性。我们在选用顺序消息的时候一定是业务上必须有顺序的要求,再就是尽量把顺序维持在一个很小的范围内,比如上面的订单的例子,同一笔订单消息之间顺序,不同的订单之间不需要有顺序关系。下面对照源代码看下顺序消息的使用方式。
消息发送
我们先看下顺序消息发送的代码,来自官方demo
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//1、初始化一个 producer,使用 group name.
MQProducer producer = new DefaultMQProducer("example_group_name");
//2、启动producer
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//3、新建一条消息,指定topic,tag、key和body
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//4、提交消息,制定queue选择器和排序参数
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//5、关闭Producer
producer.shutdown();
}
}
从这个例子中我们发现,除了第4步之外,和发送普通消息没有任何区别。也就是说在发送端,初始化一个Producer后,既可以发送普通消息,也可以用来发送顺序消息,只是调用的send方法的参数不同。
这里还有一点要说明下,这个topic也没有任何不同的地方,我们回想下topic的创建参数就知道,topic的属性只有名称、queue的数量和flag。没有字段标识topic是否是用来发order消息的。这也从侧面说明了Broker无法知道消息是否是顺序消息。
第4步的参数,第2个参数是用户自定义queue选择器,第3个是排序参数。demo中的实现是直接使用排序参数对总的队列数取模,这样可以保证相同orderId的消息肯定会走同一个queue。比如mq所有broker上总共有4个队列,订单号为1的消息走q1,同时订单号为5的也会走q1,这些消息都会按发送顺序被consumer处理。下面看下接口实现:
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//使用用户自定义MessageSelector来选择Queue
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
在普通消息的发送逻辑中,queue的选择会采用系统的负载均衡策略,默认是采用轮询的方式,同时会将broker的延时参数计算进去(具体可以回顾下Producer那一章[传送门])。而从上面的代码可以看出,queue的选择直接就是回调的用户的实现,后面的逻辑就跟普通消息一模一样了。
Consumer启动
相对于Producer,Consumer的逻辑要变的更多一些,首先看下consumer的demo:
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
//1、新建一个consumer,提供group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
//2、设置消费的起始偏移量
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//3、订阅的topic和tag过滤条件
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
//4、用户自定义消息listener,实现Orderly的接口
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5、启动consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
跟普通消息的consumer唯一的不同就是第4步,这里注册的是顺序Listener,由于一个Consumer只能注册一个Listener,所以一个consumer要么按顺序消息的方式来消费,要么按普通消息的方式来消费。所以,如果一个用户进程要收两种消息,最好使用两个Consumer实例。
从实例代码可以看出,用户处理消息后返回状态跟普通消息也有所不同,失败的话是返回的SUSPEND_CURRENT_QUEUE_A_MOMENT
,而不是RECONSUME_LATER
。这是因为对于顺序消息,消费失败是不会返回给broker重新投递的(其实即使重发也还是发到这个consumer上,没必要多此一举),而是会放到本地的缓存队列中重新处理。另外两个状态ROLLBACK
和COMMIT
已经被设置成deprecated
了,我们就不关心了。
下面我们看下顺序Consumer的启动过程,还是拿Push的方式为例,我们只截取和普通消息不同的这一端代码,DefaultMQPushConsumerImpl
:
public synchronized void start() throws MQClientException {
...
//消费服务,顺序和并发消息逻辑不同,接收消息并调用listener消费,处理消费结果
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//2、启动ConsumeMessageService
this.consumeMessageService.start();
...
}
跟普通消息唯一的区别就是这里换成了ConsumeMessageOrderlyService
。我们可以先回顾下之前讲过的consumer端逻辑:
- Consumer启动后会初始化一个
RebalanceImpl
做rebalance操作,从而得到当前这个consumer负责处理哪些queue的消息。 -
RebalanceImpl
到broker拉取制定queue的消息,然后把消息按照queueId放到对应的本地的ProcessQueue
缓存中 -
ConsumeMessageService
调用listener处理消息,处理成功后清除掉
大体的逻辑就是上面了,如果记得不是很清楚可以回看下之前讲Consumer
的文章[传送门]
下面看下上面代码中consumeMessageService.start()
中都干了什么。
//定时检查锁定状态
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
这里,cosumer会周期性的发送lock queue的命令给Broker。前面提到,对于顺序消息,必须同一个queue的消息只有一个consumer来处理,所以为了保证这一点,consumer会在锁定queue成功后才开始消费,并且会一直更新这个锁。在这里broker起到了一个分布式锁的作用。consumer获取锁之后默认每20秒就会刷新一下锁,broker如果发现锁超过1分钟没有刷新,则会自动释放,这时候其它consumer就可以抢到这个锁。
下面来看下ConsumeMessageOrderlyService
中的具体消息处理逻辑
消息处理
RebalanceImpl
在从Broker获取到消息后,会调用ConsumeMessageOrderlyService
的submitConsumeRequest()
方法:
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
...
...
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
//1、获取消息Queue锁对象,加互斥锁,保证同一个MessageQueue同时只会有一个线程在处理消息
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
//2、Cluster模式,检查ProcessQueue的状态是否仍然是已锁定
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
//3、如果本批次消费用时过长,则跳出循环,防止锁占用时间过长
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//4、从ProcessQueue获取一批消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//5、获取processQueue的锁,防止处理过程中rebalanceService移除Queue
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//6、调用用户自定义Listener处理消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//7、处理listener返回结果,如果返回false,则跳出中止后续消费
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
//等待一段时间重新尝试锁定
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
}
submitConsumeRequest()
方法仅仅是提交了一个异步任务,跟普通消息的不同的是,ConsumeRequest
仅接收了queue相关的参数,而对传过来的msgs直接扔掉了。具体处理过程在request的run方法中:
第1步,对要处理的queue加一个互斥锁,这样保证consumer客户端同时只会有一个线程在处理指定queue的消息
第2步,对于集群模式下,检查一下当前ProcessQueue
是否仍然持有queue的锁,只有持有锁才会处理消息。对于广播模式,锁是不需要的。
第4步,从ProcessQueue
的缓存队列中那一批消息准备处理,我们看下takeMessags()
的代码实现
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
if (result.isEmpty()) {
consuming = false;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
这一步是从msgTreeMap
中移动指定数量的消息到consumingMsgOrderlyTreeMap
中,并返回这部分消息。这么做的目的不是很理解。
第5步,会给ProcessQueue
加个锁,防止在处理的时候RebalanceImpl
移除queue
第7步,处理用户Listener的返回结果,我们看下处理逻辑
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
//如果成功,则调用ProcessQueue的commit方法
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
//检查重试次数,如果没超过则放到ProcessQueue中;如果超过则直接发到broker的Dead Queue中
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
...
...
}
//更新消费进度
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
这里分两种情况:
- 成功:在调用
ProcesQueue
的commit()
方法,这里逻辑就是把前面takeMessage()
时创建的临时map清空,然后记录当前消费的offset - 失败:首先调用
checkReconsumeTimes()
检查是否已经超过了最大重试的次数,如果没超过会重新放回到ProcessQueue
的msgTreeMap
中,重新触发一次消息处理。如果超过了,则直接放入broker的死信队列,然后把本地缓存清空,继续消费后面的消息。
private boolean checkReconsumeTimes(List<MessageExt> msgs) {
boolean suspend = false;
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
//如果已经超过最大重试次数
MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
//返还给broker,会直接进入dead queue中,不会再投递
if (!sendMessageBack(msg)) {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
} else {
//没超过次数,则记录已经是第几次重试了
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
}
}
return suspend;
}
最后,如果有commit
的动作,则会把进度同步给Broker
。
以上就是顺序消息的整个处理逻辑,这里面Broker
起到的作用仅仅是维护了queue的锁,其它操作对它都是透明的。