工作日志——kafka相关
背景
首先,交代一下背景。
团队最近在做kafka预发环境与生产环境隔离,同时由于原来是基于原生的kafka client自行封装来支持多套kafka集群且多线程并行消费的逻辑,以及切换集群IP到对应的L5上,所以除了切换集群外,还需要更换SDK
1、kafka 环境隔离是非常简单的事,切换集群IP即可。
2、更换SDK涉及很多代码改造,且SDK还需要支持L5,兼容IP的问题。由于这是一个共性问题,所以SDK由支撑团队来开发。
以下简单过一下这个过程遇到的几个问题。由于个人觉得比较有代表性,故而记录一下。
问题1、SpringBoot版本不一
这是一个极其容易被忽视,但是其实非常容易出现的问题,目前部门有5-6个小组在做研发,如果没有在最开始就做版本统一管理,及其容易出现版本不一致的问题。引包之后,本地运行出现如下情况:
image.png
看图我并没有第一时间怀疑版本问题,还以为是不是本地Maven又有点毛病,倒腾了几次实在没招,点开同事提供的SDK包代码翻起来,翻看时才发现问题。
低版本的(2.0.3)该接口没有default实现,而在高版本(2.2.5)上是有默认default的实现,而SDK由于是基于高版本开发所以只实现了必要的方法。此时回头去看的异常日志时,确实可以对应起来。
image.png
问题二、发现相同的消费者与topic在不同的环境下居然使用不同的group.id
在发现这个问题时,难以置信。
由于同事提供的SDK相关使用文档上是使用@KafkaListener(topics='xxx', groupId='yyy', containerFactory='zzz')的方式,联想到注解是一种编译期元数据,第一个错误的想法是排除掉可配置问题,于是想到的解决方案是采用新的groupId来统一不同环境的差异。(习惯性思维带来的可怕后果是绕过了正确的方式)
更换groupId会带来什么问题?
允许更换groupId的前置条件必须是不能丢失未消费的消息。那么必须了解一下kafka的消费进度管理的基本原理,消费进度与groupId具备什么相关性吗?这里会涉及到几个概念性的问题,比如consumer group、offset、__consumer_offset topic等等,这些概念很轻易可以在网络上搜到,自行补充。
如下图,很好的说明了groupId 与 offset的关联关系。
image.png
那么,如果更换groupId 会出现什么问题?意味着该consumer client并没有相关的消费进度,客户端更换groupId重新启动后,从服务端获取不到对应的offset时,有一个参数会指导服务端如何分配offset到客户端auto.offset.reset,值包括earliest、lastest。丢失了offset,如果采用earliest,那就相当于把消息都捞出来重新消费,这肯定不行,那么lastest会拿到一个什么参数呢?从网络上去找,很多说会消费最新的消息,那么到底什么是最新的消息,又没有提及,后来看到一篇文档说会从hw处开始消费,翻了一会源码确实找到一个看起来有点意思的地方。
def legacyFetchOffsetsForTimestamp(timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = inReadLock(leaderIsrUpdateLock) {
val localLog = localLogWithEpochOrException(Optional.empty(), fetchOnlyFromLeader)
val allOffsets = localLog.legacyFetchOffsetsBefore(timestamp, maxNumOffsets)
if (!isFromConsumer) {
allOffsets
} else {
val hw = localLog.highWatermark // 高水位
if (allOffsets.exists(_ > hw)) // 由于对scale相关语法并不了解,所以不确定最终返回的offset具体是多少,只能说猜测也许是高水位
hw +: allOffsets.dropWhile(_ > hw)
else
allOffsets
}
}
那么这个highWatermark又是如何更新的?
private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {
inReadLock(leaderIsrUpdateLock) {
// maybeIncrementLeaderHW is in the hot path, the following code is written to
// avoid unnecessary collection generation
var newHighWatermark = leaderLog.logEndOffsetMetadata
remoteReplicasMap.values.foreach { replica =>
// flower副本的LEO 小于 leader副本LEO && (上一次拉取的时间距现在还在最大拉取时间之内 或者 这是一个ISR副本)
if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
(curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicaIds.contains(replica.brokerId))) {
newHighWatermark = replica.logEndOffsetMetadata // 新水位为flower副本LEO
}
} // 由于newHighWatermark在循环中更新,那么最终newHighWatermark = min(replica.logEndOffsetMetadata.messageOffset)
// ....
}
}
可以理解为hw指的是:多个isr flower副本已经更新到的最新消息位置中最小的一个,即min(replica.logEndOffsetMetadata.messageOffset)
高位水hw是客户端消费者可见的最新的消息,那么假定原来的groupId 消费者的进度记录为offset1,如果更换了groupId后消费的 offset2 = hw,那么潜在丢失的消息包括offset_missing = hw - offset1.。所以更换groupId是风险很高的操作。
GroupId 可配置问题
峰回路转,发现虽然 @KafakListener是注解,但是依旧可以配合配置项来完成消费者的初始化。
1、可以通过@KafakListener(groupId = '${xxx}')来完成注入(是尝试行为,结果发现可行)
2、通过 spring.kafka.consumer.group-id来实现配置注入(看配置猜测也许可以)
注意,使用第二种方式时,需要在@KafakListener去掉 id、groupId两个字段的配置,为什么id会影响groupId,多少有点出乎意料。
首先看代码KafkaListenerAnnotationBeanPostProcessor
private String getEndpointGroupId(KafkaListener kafkaListener, String id) {
String groupId = null;
if (StringUtils.hasText(kafkaListener.groupId())) {
groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
}
// 没有配置groupId,但是配置了id时,groupId = id, idIsGroup默认是true,原来没注意居然还有这么个参数
if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {
groupId = id;
}
return groupId;
}
在KafkaMessageListenerContainer中有如下判断
private final String consumerGroupId = this.containerProperties.getGroupId() == null // this.containerProperties.getGroupId() 就是从注解中获取的数据
? (String) KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties()
.get(ConsumerConfig.GROUP_ID_CONFIG) // 从配置文件中读取到的配置
: this.containerProperties.getGroupId();
总结
为什么前文说这是比较有代表性的问题呢?
首先我觉得版本不一致这个问题,有时候因为种种原因(特别是大家一个团队内)你真的很难说第一眼就发现这种不一致,然后你花费了很多时间去排查最终得出一个没多少价值的结果,因为它也真的是很容易避免。其次后边遇到这种groupId问题。如果想当然的认为配置会一样,于是最终上线了发现消息丢失了那真的是粗心大意就把你埋了。如果从一开始就去翻对应的文档,也许就很好的明白可配置,以及idIsGroup的细节了,压根不需要去看源代码。当然,groupId问题进而引导开发同学去排查整个思路,对于开发同学掌握中间件有挺大帮助的,无目的性的看代码容易遗忘,但是与实践相结合才能让你明白的更加深刻。