RocketMQ源码分析----Consumer消费进度相关
在Consumer消费的时候总有几个疑问:
- 消费完成后,这个消费进度存在哪里
- 消费完成后,还没保存消费进度就挂了,会不会导致重复消费
Consumer
消费进度保存
消费完成后,会返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS告诉MQ消费成功,以MessageListener的consumeMessage为入口分析。
消费的时候,是以ConsumeRequest类为Runnable对象,在线程池中进行处理的,即ConsumeRequest的run方法会处理这个状态
@Override
public void run() {
//....
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
// 如果这个ProcessQueue废弃了,则不处理
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}
在消费完成后,将status交给processConsumeResult处理,代码如下
public void processConsumeResult(//
final ConsumeConcurrentlyStatus status, //
final ConsumeConcurrentlyContext context, //
final ConsumeRequest consumeRequest//
) {
//....消费成功或者失败的处理
// 将这批消息从ProcessQueue中移除,代表消费完毕,并返回当前ProcessQueue中的消息最小的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新消费进度
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
在分析ProcessQueue的时候,说过removeMessage返回有两种情况:
- 如果移除这批消息之后已经没有消息了,那么返回ProcessQueue中最大的offset+1
- 如果还有消息,那么返回treeMap中最小的key,即未消费的消息中最小的offset
getOffsetStore返回RemoteBrokerOffsetStore,看下其实现
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
// 通过MessageQueue获取本地的对应的消费进度
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
//increaseOnly 为false则直接覆盖
//increaseOnly为true则会判断更新的值比老的值大才会进行更新
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
这里的increaseOnly参数根据不同的情况传入不同的值,有些情况下会出现并发修改的情况,那么需要传入true,内部会进行CAS的操作,能保证正确的赋值,而一些场景下,只需要进行直接覆盖或者说没有并发修改的问题那么传入false就行了。
消费进度持久化
offsetTable是一个Map,其保存了消费进度,这只一个内存的结构,在Consumer启动的时候,会启动一个定时任务将本地的数据同步到broker,每persistConsumerOffsetInterval(默认为5)秒进行一次操作
// mqs为需要持久化的队列集合
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (mqs != null && !mqs.isEmpty()) {
// 遍历本地的消费进度
for(Map.Entry<MessageQueue, AtomicLong> entry:this.offsetTable.entrySet()){
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
// 如果该队列在需要持久化的队列中
if (mqs.contains(mq)) {
try {
// 将消费进度发送到broker
this.updateConsumeOffsetToBroker(mq, offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {//废弃的消费进度
unusedMQ.add(mq);
}
}
}
}
// 如果有废弃的MQ,则将其消费进度废弃
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
}
}
}
传入的是当前Consumer分配的MessageQueue列表,rebalance之后,可能分配的MessageQueue已经变化,所以offsetTable里有些消费进度的队列时不需要的,所以将它的消费进度废弃
updateConsumeOffsetToBroker方法就是简单的网络请求,将offset发送给Broker
消费进度提交
除了定时提交消费进度之外,在拉取消息的时候,会顺便将本地的消费进度一起传到broker,例如查看拉取消息的方法DefaultMQPushConsumerImpl#pullMessage中的一段代码
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 集群消费模式
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 通过offsetStore获取当前消费进度
// ReadOffsetType.READ_FROM_MEMORY表示从本地获取(即offsetTable)
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {//
// 传给Broker,让其判断是否需要保存消费进度
commitOffsetEnable = true;
}
}
// 构造一些标志位,这里主要看commitOffsetEnable值
// 将commitOffsetEnable放到一个int类型的值中,让broker判断是否需要保存消费进度
int sysFlag = PullSysFlag.buildSysFlag(//
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
//....
// 通过拉取消息请求,将commitOffsetValue和sysFlag传给broker
this.pullAPIWrapper.pullKernelImpl(//
pullRequest.getMessageQueue(), // 1
subExpression, // 2
subscriptionData.getSubVersion(), // 3
pullRequest.getNextOffset(), // 4
this.defaultMQPushConsumer.getPullBatchSize(), // 5
sysFlag, // 6
commitOffsetValue, // 7
BrokerSuspendMaxTimeMillis, // 8
ConsumerTimeoutMillisWhenSuspend, // 9
CommunicationMode.ASYNC, // 10
pullCallback// 11
);
具体broker对消费进度的处理看后面分析
Broker
消费进度保存
RocketMQ的网络请求都有一个RequestCode,更新消费进度的Code为UPDATE_CONSUMER_OFFSET,通过查到其使用的地方,找到对应的Processor为ClientManageProcessor,其processRequest处理对应的请求
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
更新消费进度的方法为updateConsumerOffset,里面解析了请求体之后又调用了ConsumerOffsetManager.commitOffset方法
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
}
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
clientHost, key, queueId, offset, storeOffset);
}
}
}
逻辑也很简单就不多说了,有意思的是,Broker的保存消费进度的结构和Consumer类似,Broker多了一个维度,因为Broker接收的是所有消费者的进度,而Consumer保存的是自己的
在Consumer的消费进度上报到Broker之后,Broker只是保存到内存,这并不可靠,大概也能猜出,和Consumer一样,也有一个定时任务将消费进度持久化。这时,先看下ConsumerOffsetManager这个类的继承关系,他的父类是ConfigManager,这个东西很重要,是几个重要配置信息持久化类,看下其继承关系:
image.png
分别是订阅关系管理,消费进度管理,Topic信息管理,和延迟队列信息管理,这4个配置信息都需要通过ConfigManager去持久化和加载,看下ConfigManager的几个方法
public abstract class ConfigManager {
// 将对象转换成json串
public abstract String encode();
//将文件里内容(json格式)的转换成对象
public boolean load() {
String fileName = null;
// 获取文件地址
fileName = this.configFilePath();
// 将文件里的内容读取出来
String jsonString = MixAll.file2String(fileName);
// json转换成指定对象的数据
this.decode(jsonString);
}
// 配置文件地址
public abstract String configFilePath();
// 与load类似
private boolean loadBak() {
String fileName = null;
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName + ".bak");
this.decode(jsonString);
return true;
}
// json转换成指定对象的数据
public abstract void decode(final String jsonString);
// 将对象里的数据转换成json并持久化到configFilePath()文件中
public synchronized void persist() {
String jsonString = this.encode(true);
String fileName = this.configFilePath();
MixAll.string2File(jsonString, fileName);
}
public abstract String encode(final boolean prettyFormat);
那么ConsumerOffsetManager会实现encode和decode方法并在某个地方定时调用persist方法,查看其使用的地方,找到BrokerController的initialize方法,有段定时任务如下:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
可以看到,每flushConsumerOffsetInterval(默认5000)毫秒会进行一次持久化
拉取消息的时候保存消费进度
拉取消息的Code为RequestCode.PULL_MESSAGE,对应的Processor为PullMessageProcessor,找到其中消费进度处理的地方
// 上面说的consumer传过来的commitOffsetEnable
// 当Consumer本地消费进度大于0的时候这个参数为true
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.
// brokerAllowSuspend在处理消息请求的时候为true,hold请求自己处理是false
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
// Master才需要保存进度,slave只是同步broker的消息
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getCommitOffset());//consumer传上来的offset
}
总的来说:
当broker为master的时候,且Consumer消费进度大于0则在拉取消息的时候顺便将消费进度保存到broker
问题分析
重复消费问题
在ProcessQueue的removeMessage的第二种情况有个问题,假设有如下情况:
批量拉取了4条消息ABCD,分别对应的offset为400|401|402|403,此时consumeBatchSize(批量消费数量,默认为1,即一条一条消费),那么会分4个线程去消费这几个消息,出现下面消费次序
消费D -> removeMessage -> 返回400(情况2)
消费C -> removeMessage -> 返回400(情况2)
消费B -> removeMessage -> 返回400(情况2)
消费A -> removeMessage -> 返回404(情况1)
在消费A之前,本地消费进度持久化到Broker之后,应用宕机了,那么此时Broker保存的是offset=400(准确来说,在消费完A且保存消费进度到broker之前,offset都是400)。那么会有什么问题呢?
先假设消费完DCB且消费进度上传完成宕机,然后重启应用,这时候会先从broker获取应该从哪里消费(),因为DCB消费完成后都是保存400这个消费进度,那么返回的是400,这时候consumer会请求offset为400的消费,到这里,已经重复消费了DCB。
消费进度保存在哪里
- consumer保存在内存,定时上传broker
- broker保存在内存,定时刷新到磁盘文件
注:以上没有特别声明的都是并发消费模式