Kafka文字欲

无镜--kafka之服务端处理LeaderAndIsr请求

2019-01-07  本文已影响0人  绍圣

代理节点处理控制器发送的LeaderAndIsr请求,服务端会交给副本管理器来处理。

创建分区

服务端处理LeaderAndIsr请求,根据主题名称和分区编号创建分区对象(Partition),并更新副本管理器的allPartitions集合。LeaderAndIsr请求记录了分区和分区状态的映射关系。

分区是TopicPartition对象,包括:主题名称,分区编号。

分区状态是PartitionState对象,包括:分区的主副本,ISR,AR。

服务端创建的分区是一个Partition对象,包括:主题名称,分区编号,分区状态信息,副本信息(Replica)。Partition对象综合了TopicPartition对象和PartitionState对象的数据,根据分区状态的副本信息(ISR,AR),生成Replica对象。分区状态的ISR和AR在Partition对象中对应了副本对象集合:inSyncReplicas和assignedReplicaMap。assignedReplicaMap记录了副本编号到副本对象的映射关系。

Partition对象在服务端真正用于分区数据的读写,涉及副本和日志文件的读写用Replica对象。

副本管理器管理了代理节点上的所有分区。针对同一个分区,不管是主副本还是备份副本,存储在代理节点数据目录中的文件夹名称都一样:主题名称+分区编号。

来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:副本管理器管理了代理节点上的所有分区,分区以目录形式存储在代理节点上

创建主副本和备份副本

副本管理器处理LeaderAndIsr请求,具体步骤:

1,创建分区对象,如果分区已经存在,则使用LeaderAndIsr请求中的最新分区状态。

2,对成为主副本的分区调用makeLeaders()方法,为这些分区创建主副本。

3,对成为备份副本的分区调用makeFollowers()方法,为这些分区创建备份副本。

4,如果代理节点是第一次收到LeaderAndIsr请求,则启动最高水位的检查点线程。

5,移除空闲的拉取线程,并调用onLeadershipChange()回调方法。

分区在当前节点上,要么成为主副本,要么成为备份副本。请求中的分区对象TopicPartition在服务端要转化成Partition对象。每个代理节点接收的LeaderAndIsr请求包含多个分区,副本管理器会将请求的所有分区按照分区状态的主副本(leaderId)是否等于当前代理节点的编号(brokerId)分成两种:成为主副本的分区,成为备份副本的分区。

分区分组

假设集群有三个代理节点、主题有三个分区、每个分区有三个副本,每个代理节点都会管理三个分区。分区P1的状态为{leader:1,ar:[1,2,3]},分区P2的状态为{leader:1,ar:[1,2,3]},分区P2的状态为{leader:3,ar:[3,1,2]},从代理节点的角度来看,第一个代理节点将[P1,P2]作为“成为主副本的分区”,将[P3]作为“成为备份副本的分区”,其他代理节点类似。从分区的角度来看,分区P3在第三个代理节点上是主副本,在其他两个代理节点上就是备份副本。

副本管理器将LeaderAndIsr请求的分区按照主副本和备份副本划分后分别存储到对应的集合中,partitionsToBeLeader和partitionsToBeFollower两个集合是互斥的,不会存在一个分区在同一个节点同时作为主副本和备份副本的情况。并且,如果一个分区在某个节点上是主副本,在其他节点上只能是备份副本。

一个分区可以有多个备份副本,但只允许有一个主副本。对于同一个分区而言,如果LeaderAndIsr请求的主副本编号(leaderId)和当前代理节点的编号(brokerId)相等,则调用分区的makeLeader()方法,否则调用分区的makeFollower()方法(一个分区在一个代理节点上只允许存在一个副本,所以同一个分区在同一个代理节点上,不可能被同时加入partitionsToBeLeader和partitionsToBeFollower两个集合)

加入分区集合

副本管理器针对partitionsToBeLeader集合调用makeLeaders()方法,返回partitionsBecomeLeader集合。针对partitionsToBeFollower集合调用makeFollowers()方法,返回partitionsBecomeFollower集合。副本管理器在调用分区的makeLeaders()方法和makeFollowers()方法,这两个方法只有返回true的时候相对应的分区才需要加入对应的分区集合。什么情况下返回true?

1,分区对象的主副本不存在;2,分区对象的主副本已经存在,但和分区状态对象的主副本不同。满足其中一个条件都会返回true。

第一次创建分区的主副本和备份副本时,分区对象的主副本编号还没有定义。因此分区的makeLeaders()和makeFollowers()方法都返回true,都加入对应的分区集合:partitionsBecomeLeader和partitionsBecomeFollower集合。

控制器第二次发送LeaderAndIsr请求,分区的主副本没有变化,makeLeaders()和makeFollowers()方法都返回false,不需要将分区加入对应的分区集合。如果控制器多次下发LeaderAndIsr请求的内容都一样,代理节点实际上只处理一次,代理节点处理LeaderAndIsr请求,不只是通过分区状态的对象(PartitionState)更新分区对象(Partition)的信息,还需要处理分区对象的其他组件:拉取管理器管理的分区集合。

分区与拉取管理器

代理节点的副本管理器会管理所有的分区,拉取管理器会管理所有的备份副本对应的分区。调用分区的makeLeaders()和makeFollowers()方法时,拉取管理器都需要处理分区的变化。

处理主副本,备份副本的逻辑

调用makeLeaders()方法,如果分区之前是备份副本,拉取管理器有这个分区。当分区的备份副本转为主副本,拉取管理器需要将分区移除;反之拉取管理器需要添加分区。

调用makeFollowers()转为备份副本时,需要将日志文件截断到副本的最高水位。调用调用makeLeaders()转为主副本时,需要将副本的最高水位作为日志的最新偏移量。

创建副本

副本管理器创建分区对象(Partition)时,分区对象中的主副本,AR,ISR等信息都为空,当分区创建副本时(不管是主副本还是备份副本),才会开始更新分区对象的相关信息。创建或删除副本只是更新分区的assignedReplicaMap集合(分配给分区的副本集合:AR)。

makeLeaders()和makeFollowers()处理的是多个分区,针对每个分区,分别调用makeLeader()和makeFollower()方法更新分区信息。

makeLeader()方法返回值为true表示新的主副本:分区已有主副本编号与新的主副本编号是否相同。调用makeLeader()的场景:

1,原先没有主副本,调用makeLeader()方法的副本就会转为主副本 。

2,原先有主副本,而且新的主副本和原来的主副本一致。

3,原先有主副本,但是新的主副本和原来的主副本不一致,比如备份副本转为主副本 。

makeLeader()和makeFollower()方法都会设置分区的主副本和AR,创建分区的主副本时,分区对象中有ISR,创建备份副本时,分区对象没有ISR。并且并不是只是创建一个对应的副本,而是根据分区的AR集合创建多个副本。创建的多个副本按照是否在当前代理节点上分为:本地副本和远程副本。

本地副本和远程副本

创建的本地副本有日志文件,创建的远程副本没有日志文件。分区对象的makeLeader()和makeFollower()方法,首先都会从分区状态信息对象中解析出分配给分区的副本集,然后调用方法创建所有副本。

每个分区对象都会创建分区的所有副本。 分区对象从分区状态信息对象中读取所有的副本集(AR),并为每个副本编号创建一个对应的副本对象,分区创建出来的副本不一定都有日志文件。 日志是真正存储在代理节点上的物理介质,只有本地副本才有日志。

本地副本只有一个,远程副本可以有多个。 但本地副本和主副本、备份副本没有必然的联系,本地副本只有结合本地代理节点才有意义 。分区在当前代理节点上是主副本,那么本地副本就是主副本,其他远程副本就是备份副本。分区在当前代理节点上是备份副本,那么本地副本是备份副本,远程副本有一个是主副本,其他都是备份副本。

分区的每个副本(主副本和备份副本)在自己的代理节点上,都有一个对应的本地日志文件。对于同一个分区,备份副本会同步主副本的日志文件数据,并写入到备份副本自己的本地日志文件中。分区的多个副本数据保持了同步。一旦主副本挂掉,控制器会在备份副本中选举一个作为主副本。 因为备份副本的日志文件和旧的主副本已经保持数据同步,所以选举新的主副本,并不会丢失数据。

消费者元数据迁移

协调者处理消费者请求,有两种数据是以内部主题(_consumer_offsets)的形式存储在服务端的协调者节点的内存缓存中:1,消费者提交的偏移量(key:GroupTopicPartition,value:消费者提交的偏移量)。2,消费组分配的状态数据(key:消费组编号,value:分配给每个消费者的分区结果)。当节点宕机时,服务端要处理主副本的故障转移,还要在其他节点上恢复缓存数据。处理内部主题的LeaderAndIsr请求,服务端也会创建分区的主副本和备份副本。 

回顾:消费者要和协调者通信,必须首先找到消费组所属的协调者。消费者通过GROUP_COORDINATOR请求,向任意一个节点获取消费组对应的协调者,然后再和协调者联系。每个消费组都有唯一对应的协调者,协调者会保存消费组相关的元数据。消费者提交分区的偏移量给协调者,更新消费者元数据,除了追加消息到内部主题对应的本地日志文件,也会更新缓存的内容。缓存是为了更快地查询,当需要查询分区的提交偏移量,或者查询消费者的元数据,可以直接查询缓存,而不需要读取日志文件。

消费者联系的协调者节点属于内部主题某个分区的主副本。这个主副本的代理节点上就保持的有对应的消费组元数据。所以只要这个分区的主副本发生改变时(变化到其他代理节点上),消费者联系的节点也会随之变化。如果是成为主副本,消费组的元数据管理器(GroupMetadataManager)会加载分区的消费组元数据到缓存中,如果是转为备份副本,消费组的元数据管理器会移除消费组的缓存内容。

消费组的协调者收到主消费者发送的分区分配结果(同步组请求),并存储到内部主题。然后协调者会发送“同步组响应”给消费组的所有消费者,并且“尝试完成并调度下一次心跳”。最后,协调者才会将消费组的状态改为“稳定”。协调者在调用onGrouploaded()方法加载消费组的元数据时,如果数据有在内部主题中,说明消费组已经处于“稳定状态”,协调者也需要“尝试完成并调度下一次心跳” 。协调者加载完消费组元数据后,会开始监控每个消费者成员的心跳请求 。协调者调用onGroupUnloaded()方法会卸载消费组,并将消费组的状态转为“失败” 。在消费组的状态是失败的情况下,如果消费组之前的状态是“准备再平衡,则返回“加入组响应”给所有消费者,如果消费组之前的状态是“稳定”或是“等待同步”,则返回“同步组响应”给所有消费者,这两种返回响应的错误码都是:“当前节点不是消费组的协调者”。消费者在收到这样的错误码后,会重新连接正确的协调者节点,并重新发送加入组请求或同步组请求。

举例:主题test有三个分区,有三个代理节点,消费组group1的三个消费者分配到不同的分区,并向分区的主副本拉取数据。消费组对应内部主题的分区是_consumer_offset_1,内部分区的主副本在代理节点1上,内部分区保存的数据有两种:三个消费者提交的分区偏移量 、主消费者发送的分区分配结果 。代理节点1的 “消费组元数据管理器”会保存这两种数据的缓存内容(数据写到内部分区后,更新消费组元数据管理器的缓存)。其他两个代理节点因为不是消费组的协调者,所以他们的消费组元数据管理器并不会保存以上两种缓存数据,但是他们的内部分区作为备份副本,会向内部分区的主副本同步数据。如果内部分区_consumer_offset_1的主副本转移到代理节点3上,那么代理节点3的协调者会处理内部分区的数据迁移:加载内部分区的所有数据到缓存中。原先的代理节点1不是消费组的协调者,如果消费者连接到代理节点1上,服务端会返回错误码给消费者。消费者必须重新连接新的协调者,即代理节点3。

当分区的主副本发生故障转移,代理节点处理控制器发送的LeaderAndIsr请求,会对分区调用makeLeader()和makeFollower()方法更新分区的状态信息:更新主副本,AR,ISR。当消费组的协调者发生故障转移,消费组的每个消费者都需要连接新的协调者。代理节点要恢复缓存数据,当消费者需要获取分配的分区,或者根据分区读取提交偏移量,就可以直接从缓存内容中读取,减少磁盘的读取操作。

总结:

1,创建普通主题test和内部主题_consumer_offsets,控制器发送LeaderAndIsr请求,不同代理节点上的不同分区分别成为主副本和备份副本,备份副本会同步主副本的数据。

2,控制器发送UpdateMetadata请求给每个代理节点,元数据缓存保存了每个主题的元数据。

3,消费组的每个消费者向任意一个代理节点发送GROUP_COORDINATOR请求,获取消费组的协调者。

4,消费组的每个消费者从步骤2的元数据缓存获取分配分区的主副本,它向分区的主副本拉取数据。

5,消费组的每个消费者提交分区的偏移量,发送心跳给步骤3的协调者节点。

6,代理节点1出现故障,控制器选举分区test_1和_consumer_offsets_1的主副本,并发送LeaderAndIsr请求给存活的代理节点。代理节点处理请求,更新分区的状态信息。

7,控制器发送UpdateMetadata请求给每个代理节点,更新元数据缓存。

8,消费组的每个消费者原先连接的协调者是代理节点1,它会返回“不是消费组的协调者”给每个消费者。消费者重新发送GROUP_COORDINATOR请求获取新的协调者。因为步骤7中已经更新了元数据缓存,所以消费者查询到的协调者就是管理消费组的最新协调者。

9,消费者1分配分区test_1的主副本转移到代理节点2上,消费者1会从代理节点2拉取数据。

10,消费组的每个消费者连接新的协调者,并提交分区的偏移盘、发送心跳给新的协调者节点。

消费组元数据缓存:存储:内部主题和协调者节点的内存;来源:消费者提交分区的偏移量,消费者保存分区的分配结果;存储内容:偏移量缓存(OffsetAndMetadata)和消费组元数据(GroupMetadata);每个节点数据都不同。

主题元数据:存储:ZK;来源:控制器更新分区的状态信息;存储内容:主题元数据(TopicMetadata);每个节点数据都相同。

参考资料:

Kafka技术内幕:图文详解Kafka源码设计与实现

上一篇下一篇

猜你喜欢

热点阅读