kafka第三章

2021-05-15  本文已影响0人  阿sir1990

如何保存消费端的消费位置

offset的定义

每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的; 对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。

offset的维护位置

在kafka中,提供了一个consumer_offsets_* 的一个topic,把offset信息写入到这个topic中。 consumer_offsets——按保存了每个consumer group某一时刻提交的offset信息。 __consumer_offsets 默认有50个分区

计算公式

Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 

由于默认情况下 groupMetadataTopicPartitionCount有50个分区,计算得到的结果为:x, 意味着当前的 consumer_group的位移信息保存在__consumer_offsets的第x个分区

分区的副本机制

创建一个带副本机制的topic(集群情况下操作)

通过下面的命令去创建带2个副本的topic

sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic

然后我们可以在/tmp/kafka-log路径下看到对应topic的副本信息了。我们通过一个图形的方式来表达。 针对secondTopic这个topic的3个分区对应的3个副本


kafka-17.png

确定leader副本

在zookeeper服务器上,通过如下命令去获取对应分区的信息, 比如下面这个是获取secondTopic第1个分区的状态信息。get /brokers/topics/secondTopic/partitions/1/state得到{"controller_epoch":12,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1]}
或通过这个命令sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test

需要注意的是,kafka集群中的一个broker中最多只能有一个leader副本,leader副本所在的broker节点的分区叫leader节点,follower副本所在的broker节点的分区叫follower节点

副本的leader选举

LEO

即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外, leader LEO和follower LEO的更新是有区别的。

HW

即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别.

从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有follower副本都同步完之后(当然这边也会有不同的acks 机制)才能被认为已经提交,之后才会更新分区的HW, 进而消费者可以消费 到这条消息。

kafka-21.png

副本协同机制

图解:一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为 leader;leader负责维护和跟踪ISR(in-Sync replicas , 副本同步队列)中所有follower滞后的状态。当 producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。

ISR

ISR表示目前“可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集”。怎么去理解可用和相差不多这两个词呢?具体来说,ISR集合中的副本必须满足以下的条件

follower副本把leader副本LEO之前的日志全部同步完成时,则认为follower副本已经追赶上了leader 副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,kafk副本管理器会启动一个副本过期检 查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtUpTimeMs的差值是否大于参数replica.lag.time.max.ms 的值,如果大于,则会把这个副本踢出ISR集合

图解


kafka-19.png

副本同步数据原理

初始化状态

初始状态下,leader和follower的HW和LEO都是0,leader副本会保存remote LEO,表示所有follower LEO,也会被初始化为0,这个时候,producer没有发送消息。follower会不断地个leader发送FETCH 请求,但是因为没有数据,这个请求会被leader寄存,当在指定的时间之后会强制完成请求,这个时间 配置是(replica.fetch.wait.max.ms),如果在指定时间内producer有消息发送过来,那么kafka会唤醒 fetch请求,让leader继续处理 kafka-22.png

数据的同步处理会分两种情况,这两种情况下处理方式是不一样的
1.第一种是leader处理完producer请求之后,follower发送一个fetch请求过来
2. 第二种是follower阻塞在leader指定时间之内,leader副本收到producer的请求。

第一种情况

leader处理完producer请求之后,follower发送一个fetch请求过来 。状态图如下

kafka-22.png

leader副本收到请求以后,会做几件事情
1.把消息追加到log文件,同时更新leader副本的LEO
2.尝试更新leader HW值。这个时候由于follower副本还没有发送fetch请求,那么leader的remote LEO仍然是0。leader会比较自己的LEO以及remote LEO的值发现最小值是0,与HW的值相同,所以不会更新HW

follower fetch消息

kafka-24.png

follower 发送fetch请求,leader副本的处理逻辑是:
1.读取log数据、更新remote LEO=0(follower还没有写入这条消息,这个值是根据follower的fetch 请求中的offset来确定的)
2.尝试更新HW,因为这个时候LEO和remoteLEO还是不一致,所以仍然是HW=0
3.把消息内容和当前分区的HW值发送给follower副本

follower副本收到response以后
1.将消息写入到本地log,同时更新follower的LEO
2.更新follower HW,本地的LEO和leader返回的HW进行比较取小的值,所以仍然是0

第一次交互结束以后,HW仍然还是0,这个值会在下一次follower发起fetch请求时被更新

kafka-25.png

follower发第二次fetch请求,leader收到请求以后
1.读取log数据
2.更新remote LEO=1, 因为这次fetch携带的offset是1.
3.更新当前分区的HW,这个时候leader LEO和remote LEO都是1,所以HW的值也更新为1 4. 把数据和当前分区的HW值返回给follower副本,这个时候如果没有数据,则返回为空

follower副本收到response以后
1.如果有数据则写本地日志,并且更新LEO
2.更新follower的HW值

到目前为止,数据的同步就完成了,意味着消费端能够消费offset=1这条消息。

第二种情况

前面说过,由于leader副本暂时没有数据过来,所以follower的fetch会被阻塞,直到等待超时或者 leader接收到新的数据。当leader收到请求以后会唤醒处于阻塞的fetch请求。处理过程基本上和前面说 的一致

  1. leader将消息写入本地日志,更新Leader的LEO 2. 唤醒follower的fetch请求
  2. 更新HW
    kafka使用HW和LEO的方式来实现副本数据的同步,本身是一个好的设计,但是在这个地方会存在一个 数据丢失的问题,当然这个丢失只出现在特定的背景下。我们回想一下,HW的值是在新的一轮FETCH 中才会被更新。我们分析下这个过程为什么会出现数据丢失
上一篇下一篇

猜你喜欢

热点阅读