rocketmq_消费消息出现异常时
2022-05-03 本文已影响0人
kele2018
最近项目组的一个服务出现了消息丢失的问题,虽然当时通过手动重发的方式解决了,但是对于丢失的原因一直没有去深挖,这几天趁着疫情隔离在家看了一下,现在做个总结;下面是当时发生丢失问题的代码:
@Service
@Slf4j
@RocketMQMessageListener(topic = "****", consumerGroup = "****", ACK = ACKMode.AUTO)
public class MQFxQrCodeLogListener extends AbstractMQListener {
@Override
protected void messageListen(MQMessage message) throws Exception {
try{
// 消费业务
} catch (Exception e) {
Throwable throwable = new CommonException(BootConstants.SERVICE_ERR_CODE, e.getMessage(),
JSON.toJSONString(message), e);
//钉钉告警
notifyCollectorService.exception(throwable);
}
}
}
1、rocketmq消息消费流程
(1) 负载均衡,构造PullRequest,并存入队列;
(2) 从队列中获取请求,并向broker拉取消息;
(3) 向线程池中提交任务;
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
//构造任务
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
//提交任务
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 一条一条提交到线程池
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
(4) 使用listener消费消息;
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
......
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 消费逻辑由用户自定义,所以可能抛出异常
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup, msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
......
if (!processQueue.isDropped()) {
//处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
(5) 处理消费结果;
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
/**
1、只有处理失败的消息才会上报broker 每次只发送一个消息的处理结果
2、context中可以设置具体的处理策略
Message consume retry strategy<br>
-1,no retry,put into DLQ directly<br>
0,broker control retry frequency<br>
>0,client control retry frequency
private int delayLevelWhenNextConsume = 0;
**/
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
2、接下来我们看springboot与rokcetmq集成的时候,具体的异常处理策略
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
Iterator var3 = msgs.iterator();
while(var3.hasNext()) {
MessageExt messageExt = (MessageExt)var3.next();
DefaultRocketMQListenerContainer.log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
DefaultRocketMQListenerContainer.this.rocketMQListener.onMessage(DefaultRocketMQListenerContainer.this.doConvertMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
DefaultRocketMQListenerContainer.log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception var9) {
DefaultRocketMQListenerContainer.log.warn("consume message failed. messageExt:{}", messageExt, var9);
// 由于我们在定义监听器的时候,ACK = ACKMode.AUTO,所以此处一定是false
if (DefaultRocketMQListenerContainer.this.canRetry(messageExt)) {
context.setDelayLevelWhenNextConsume(DefaultRocketMQListenerContainer.this.delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// -1是进死信队列
context.setDelayLevelWhenNextConsume(-1);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
回头看我们的代码,我们处理消费业务抛出的异常时,发送了钉钉告警,并吞掉了异常,此时rocketmq会认为该消息成功消费了,既不会进死信队列,也不会重试;造成消息丢失。