kafka本地镜像写入成功编排服务写入失败报错Message c
2019-06-20 本文已影响0人
funkol2007
背景
在开发重构pubsubv2的过程中测试发现连接本地kafka镜像写入正常,而连接我们测试服务器的devops里镜像失败。报错信息:
kafka: Failed to produce message to topic Asys.i.bench.D_Asys.i.bench.D: Message contents does not match its CRC.
追查过程
- 这个错误曾经我遇到过,问题是因为client在设置config的时候没有吧kafka的version设置正确。所以一开始就针对版本设置进行多次更改,均无效果。
- 觉得可能是编排的镜像跟本地镜像有所不同,所以把编排镜像拉到本地启动并请求,发现可以写入成功。更奇怪,同样的镜像启动本地却可以正常写入,远程就不行了。
- 怀疑是网络问题。但是ping过延迟只有几毫秒,而且是所有写请求全部错误,不应是网络延迟导致。
- 求助同事后找到解决方案。
问题原因
本地写入成功而远程测试机写入不成功的原因是在镜像启动时的参数不同。我编排的kafka镜像启动环境变量设置如下:
KAFKA_PORT='23310'
KAFKA_ADVERTISED_HOST_NAME='KSSCS40000H31904120007'
KAFKA_ZOOKEEPER_CONNECT='KSSCS40000H31904120007:23010'
KAFKA_BROKER_ID='1'
KAFKA_LOG_DIRS='/kafka'
KAFKA_AUTO_CREATE_TOPICS_ENABLE='false'
KAFKA_LOG_CLEANUP_POLICY='compact'
KAFKA_LOG_CLEANER_MIN_COMPACTION_LAG_MS='604800000'
KAFKA_HEAP_OPTS='-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
JMX_PORT='23311'
KAFKA_JMX_OPTS='-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=23311'
其中引起问题的是 KAFKA_LOG_CLEANUP_POLICY='compact' 这个环境变量。 在kafka默认的配置中 KAFKA_LOG_CLEANUP_POLICY='delete' 而我本地启动时用的就是默认配置。
发送代码片段是这么写的:
kafkaTopic := getKafkaTopicName(sub.TopicName, sub.Name)
for _, message := range messages {
if !publishFilter(sub, message) {
continue
}
value, err := proto.Marshal(message)
if err != nil {
continue
}
msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value))}
select {
case producer.Input() <- msg:
counterPublishMsgOfSubcription.With(prometheus.Labels{topicLabelName: sub.TopicName, subscriptionLabelName: sub.Name}).Inc()
case <-ctx.Done():
break
}
}
发送不成功的原因在于
msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value))}
中ProducerMessage没有填写Key这个字段。
由于我们发送message是采用随机的形式而不是key的hash形式所以认为这个字段可以不用填写,一是不知道填写什么,二是觉得也没有用到。 恰恰是这个没有用到就是踩到的坑。
在 KAFKA_LOG_CLEANUP_POLICY='compact' 时kafka的删除策略采用的是按key保留来做压缩。因此如果写入的message没有Key这个字段就会报错(这个报错信息跟原因不太一致,网上也有相关讨论说CRC报错并不一定与描述相符)。
解决方法
知道问题所在后发送代码相应改成
msg := &sarama.ProducerMessage{Topic: kafkaTopic, Value: sarama.StringEncoder(string(value)), Key: sarama.StringEncoder(message.Id)}
然后发送成功,问题解决。