【kafka】kafka topic 分区 无法被消费问题分析处
一、问题背景
kafka 3节点的集群中的其中一台kafka 因磁盘空间不足宕机重启后,业务会受到影响,无法生产与消费消息。
kafka&zookeeper版本:
zookeeper-3.4.13.jar
kafka_2.11-2.0.0.jar
消费者工程日志报错:
WARN [Consumer clientId=consumer-1, groupId=console-consumer-55928] 1 partitions have leader brokers without a matching listener, including [baidd-0] (org.apache.kafka.clients.NetworkClient)
org.apache.kafka.clients.NetworkClient :
[Consumer clientId=consumer-1, groupId=xxxx-center] 1 partitions have leader brokers without a matching listener, including [xxxx-xxxx-xxxx-message-0]
二、 问题排查分析
从字面意思来看,当前分区所对应的的broker失去监听,为什么监听不到?怀疑是Kafka某个节点有问题-失联-假死?
从这个表象来看,某台机器有过宕机事件,宕机原因因环境而异,但Kafka的高可用性HA我们是耳熟能详的,为啥我们搭建的Kafka集群由多个节点组成,但其中某个节点宕掉,整个分区就不能正常使用-消费者端无法订阅到消息。
我们来查看 kafka的配置:
cat config/server.properties
kafka-zookeeper01:
broker.id=0
listeners=PLAINTEXT://192.168.1.101:9092
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
log.dirs=/data/kafka/kafka-logs
zookeeper.connect=192.168.1.101:2189,192.168.1.102:2189,192.168.1.103:2189
##################################################################
kafka-zookeeper02:
broker.id=1
listeners=PLAINTEXT://192.168.1.102:9092
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
log.dirs=/data/kafka/kafka-logs
zookeeper.connect=192.168.1.101:2189,192.168.1.102:2189,192.168.1.103:2189
#################################################################
kafka-zookeeper03:
broker.id=2
listeners=PLAINTEXT://192.168.1.103:9092
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
log.dirs=/data/kafka/kafka-logs
zookeeper.connect=192.168.1.101:2189,192.168.1.102:2189,192.168.1.103:2189
这里使用了默认的topic分区副本数量:offsets.topic.replication.factor=1,当分区副本数量为1,则副本信息只会存在某一个broker节点,Isr即其自身。
这很容易出现单点故障,当当前节点挂了的时候,选举不出新的leader,导致分区不可用。
在生产环境的话,可设置多个副本因子来保证高可用性(比如三个节点组成一个集群,副本数量为2,这样当任意一台节点丢失,kafka集群仍会正常工作。
image.png从数据组织形式来说,Kafka有三层形式,Kafka有多个主题,每个主题有多个分区,每个分区又有多条消息。
从逻辑模型来说,如下图所示,Kafka有这么四个逻辑概念:Broker(节点),Topic(主题),Partition(分区),Offset(偏移量)。
image.pngKafka在0.8版本之后引入了多副本冗余机制,很好地解决宕机后数据丢失的问题。
副本是以Topic中每个Partition的数据为单位,每个Partition的数据会同步到其他物理节点上,形成多个副本。
如上图所示,每个Partition的副本都包括一个Leader副本和多个Follower副本,Leader由所有的副本共同选举得出,其他副本则都为Follower副本。
在生产者写或者消费者读的时候,都只会与Leader打交道,在写入数据后,Follower就会来拉取数据。
事后排查时发现,因为 "__consumer_offsets" Topic是Kafka内置的队列,所以它们被默认自动创建且副本数默认为1。
那么,这就有问题了,如果它们所在的服务器宕机,那么这个节点上的所有Partition的数据就无法再被消费,等于说发送到这三个Topic的数据就丢失了,而"__consumer_offsets" 是Kafka用来存储消费者消费的offset(偏移量)信息的,它没有了,集群的消息监控自然也就没数据了,看上去就好像消息消费都delay了一样。
三、根本问题原因
大家会问,这三个内置队列为什么会副本数为1,造成单点故障的隐患呢?
其实,配置文件里它们配置的副本数为3,如下图所示。
image.png其原因就在于,如果Kafka第一次创建的主题是1个副本的话,即使后来手动修改配置文件server.properties将default.replication.factor或者topic的这个属性(如上图7所示)改为3,Broker上也是不会自动调整副本数的,所以必须手动调整(最好在kafka集群第一次启动前就配置好),不然这个单点故障就会延续。
注意,不要设置了default.replication.factor=3,又设置offsets.topic.replication.factor=1,这样offsets.topic.replication.factor的值会覆盖default.replication.factor的值。
四、解决办法
“手动调整”指的是手动增大分区副本数。
这是通过Kafka自带的 kafka-reassign-partitions.sh 脚本工具来完成的。
1.编写一个JSON文件,定义 topic 的各个 partition 所属的 replicas broker id
$ cat reassign.json
{
"version": 1,
"partitions": [
{
"topic": "__consumer_offsets",
"partition": 0,
"replicas": [
0,
1,
2
]
}
]
}
- 通过 --reassignment-json-file 参数,指定上述JSON文件,通过 --execute 参数,执行分区副本分配策略
$ bin//kafka-reassign-partitions.sh --zookeeper xx.xx.xx.xx:2181 --reassignment-json-file reassign.json --execute
- 命令执行后提示Successfully,就表示扩容成功了
Successfully started partition reassignment for __consumer_offsets-0
- 通过查看 topic 的情况来确认是否扩容成功
bin/kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --topic __consumer_offsets --describe
这样的话,假如 broker0 挂了的话,__consumer_offsets 分区 0 的 Leader replica 可能会变为 1。
- 对 __consumer_offsets 的其他分区同样操作
四、 经验教训
不仅仅是SA,还包括大数据集群的开发者,加强对 Kafka 的工作机制的学习,缩短排查问题的时间。
要熟练使用 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具来完成对 topic 的分区分配、分区副本增加等操作。
该脚本有三个参数:
--generate:配合着 --topics-to-move-json-file 可以生成分区分配策略,该参数适用于分区多的情况。
--execute:配合着 --reassignment-json-file 可以执行分区分配策略。
--verify:配合着 --reassignment-json-file 可以检查分区分配进度。
通过以上命令,既可以分配分区,也可以增加分区副本数,非常方便。
五、参考
kafka 主题、分区check脚本
https://www.jianshu.com/p/d524a84d505a
Kafka突然宕机了?稳住,莫慌!
https://baijiahao.baidu.com/s?id=1679188013511739797&wfr=spider&for=pc
Kafka集群消费告警排障手记
https://mp.weixin.qq.com/s/gMre9OxQzZ7-WER0O79zng
https://mp.weixin.qq.com/s/Xl-0GJKZ4IMHfY6fLSAOUg
apache kafka系列之在zookeeper中存储结构
https://blog.csdn.net/lizhitao/article/details/23744675
Kafka动态增加Topic的副本
https://blog.csdn.net/shykevin/article/details/90103360
kafka添加修改topic分区副本
https://blog.csdn.net/qq_50009899/article/details/115704412
partitions have leader brokers without a matching listener
https://blog.csdn.net/yabingshi_tech/article/details/120670096
kafka:topic为什么要进行分区?副本机制是如何做的?
https://blog.csdn.net/weixin_38750084/article/details/82942564
教你如何重新分布kafka分区、增加分区副本数
https://cloud.tencent.com/developer/article/1755177
Kafka动态增加Topic的副本
https://www.cnblogs.com/xiao987334176/p/10315176.html
kafka修改分区、副本数、副本迁移
https://sukbeta.github.io/kafka-Modify-Partitions-and-ReplicationFactor/