Kafka文字欲

无镜--kafka之控制器(二)

2018-12-26  本文已影响0人  绍圣

分区状态机和副本状态机

分区状态机和副本状态机需要获取集群中的所有分区和所有副本,控制器上下文会从ZK中读取集群的所有分区和副本,所以初始化控制器上下文后,才能启动状态机。

分区包含了多个副本,只有集群中的所有副本的状态都初始化完成后,才可以初始化分区的状态。所以控制器会先启动副本状态机,再去启动分区状态机。

分区状态机和副本状态机分别维护集群中所有分区和副本的状态。分区和副本都有四种状态:新建,在线,下线,不存在。分区和副本从一个状态转变为另一个状态都对应了不同的事件。

副本状态机中的副本状态和状态转移

1,NewReplica:新创建主题或者重新分配分区时创建新副本,副本状态为“新建”。该状态下副本只能收到“成为备份副本”的请求。

2,OnlineReplica:当副本启动,并且是分区副本集的一部分,副本状态为“在线”。该状态下可以收到“成为备份副本”和“成为主副本”的请求。

3,OfflineReplica:代理节点宕机后,节点上的所有副本状态为“下线”。

4,NonExistentReplica:副本被成功删除后,状态为“不存在”。

分区状态机中的分区状态和状态转移

1,NonExistentPartition:分区没有创建或者创建后立即被删除,状态为“不存在”。

2,NewPartition:新创建分区,状态为“新建”。

3,OnlinePartition:分区选举出主副本,状态为“上线”。

4,OfflinePartition:分区有主副本,但是主副本所在的节点宕机了,状态为“下线”。

状态机的状态转换都有前置条件。正常的状态机流程是:不存在状态-->新建状态-->在线状态-->下线状态;新建状态-->下线状态。无效的状态转换:不存在状态-->在线状态;不存在状态-->下线状态。状态机在处理无效状态转换的方式是检查前置条件,如果前置条件不满足,就不执行任何操作。

外部事件调用状态机状态更改方法,需要传递一个副本或者分区,以及一个目标状态,表示外部事件希望分区或者副本的状态更改为目标状态。分区状态机在处理从“下线状态”转化为“上线状态”还需要传递一个分区主副本的选举器。因为分区要更改为上线状态必须是选举出主副本。分区状态从“新建”到“上线”不需要传递选举器,是因为选举的算法都是固定的:选择第一个加入的副本作为分区的主副本。

状态转换与处理

新创建主题时,ZK节点/brokers/topics的更改主题监听器捕获到创建主题事件,会调用控制器的创建主题方法。新创建分区时,ZK节点/brokers/topics/[topic]的更改分区监听器捕获到创建分区事件,会调用控制器的创建分区方法。控制器的这两个方法会将分区从“不存在状态”到“新建状态”再到“上线状态”。分区状态的变化必然有副本的状态变化,副本也会从“不存在状态”到“新建状态”再到“上线状态”。

新建分区会调用两次分区状态机的状态改变处理方法:

1,分区从“不存在状态”到“新建状态”,更改分区的状态为NewPartition。这个状态前,没有分区,转换后分区有副本。

2,分区从“新建状态”到“上线状态”,更改分区的状态为OnlinePartition。分区初始化主副本(选举AR集合中的第一个副本作为主副本)和ISR。如果是已经“下线状态”的分区要上线,必须重新选举主副本;如果是“上线状态”到“上线状态”,也要重新选举主副本。

新创建分区时,分区的副本也一定都是新创建的。

1,分区的所有副本从“不存在状态”到“新建状态”,会判断分区是否有主副本,但由于是新创建的分区,分区还没有主副本,因此更改副本的状态为NewReplica。

2,分区的所有副本从“新建状态”到“上线状态”。这时分区有主副本,更改副本的状态为OnlinePartition。

选举分区的主副本

如果分区当前处于“上线状态”或“下线状态”,要转变为“上线状态”,需要重新选举分区的主副本。分区当前为“下线状态”表示:分区有主副本,但是主副本挂掉了。分区当前为“上线状态”表示:分区有主副本,虽然主副本还存活,但是控制器要选举其他的副本作为分区的主副本。

分区的主副本选举接口有几个场景的实现类:1,主副本不可用后,选举主副本;2,重新分配分区时,选举主副本;3,使用最优的副本作为主副本;4,代理节点挂掉后,重新选举主副本。

分区状态机为分区选举主副本

1,从ZK的分区状态节点读取分区当前的主副本,ISR集合。

2,调用“选举分区主副本”具体实现类的选举方法,为分区选举最新的主副本。

3,将最新的主副本和ISR信息更新到ZK的分区状态节点,这一步必须成功后才会继续执行步骤4和步骤5。

4,更新控制器上下文中的分区缓存信息。

5,发送最新的LeaderAndIsr请求给分区的所有副本,更新其他副本上的缓存信息。

为分区选举主副本时,优先从ISR中选择第一个副本作为主副本。如果能从ISR集合中选举主副本,可以保证数据不会丢失。如果ISR都挂了,则选择AR中第一个存活的副本作为主副本。这种情况下丢失数据可能性比较大了,因为AR中的副本不保证同步。分区状态机中只有在“下线状态”和“上线状态”转换“上线状态”时,才会用到选举类。

最优副本选举类:选择分区的第一个副本作为主副本。什么情况下分区的第一个副本会不是主副本喃?一旦主副本所在的节点挂掉,控制器会为分区选举新的主副本。当原来主副本的代理节点重新上线后,会成为分区的备份副本,这样就出现了第一个副本不是主副本的情况。最优副本选举的目的是平衡分区。kafka的分区算法保证了:分区的主副本会均匀分布在所有的代理节点上。如果频繁出现第一个副本不是主副本的情况,有可能某些节点上的分区主副本太集中了,会对客户端的读写带来影响(多个客户端的读写都在同一个节点),最优副本选举类作为一个后台进程,会定时检查第一个副本是不是主副本。如果不是,在重新平衡分区时,控制器将第一个副本作为分区的最新的主副本。

代理节点下线

代理节点发送变化时,BrokerChangeListener监听器会读取/brokers/ids中最新的代理节点列表,并与控制器上下文中的保存的代理节点变量中保存的值进行比较,得出需要上线或者下线的节点。然后控制器对需要上下线的节点分别调用onBrokerStartup()和onBrokerFailure()方法。代理节点下线时,代理节点上面的所有副本状态都会被更改为“下线状态”。

假设:代理节点下线之前,分区的状态为上线状态,如果分区的主副本在下线的代理节点上,控制器针对没有主副本的分区,处理步骤如下:

1,将没有主副本的分区从上线转换为下线。

2,通过选举器为分区选举新的主副本(继续提供读写服务)。

3,为分区选举新的主副本后,发送LeaderAndIsr请求给分区的所有存活的副本。

4,如果为分区选举出主副本,将分区的状态从下线状态改为上线状态(因为为分区选举出了主副本所以分区的状态要变为上线,这时主副本就是在其他的代理节点上)。

5,代理节点上的所有副本的状态更改为下线状态(因为副本所在的代理节点已经宕机,所以副本自然要是下线状态)。

代理节点上线

代理节点下线时,控制器针对没有主副本的分区,通过选举新的主副本,分区状态从下线到上线。如果没有成功选举出新的主副本,分区状态依旧是下线。所以在代理节点上线时,还会触发一次分区的状态转换,将下线状态的分区转化为上线状态,如果分区的状态是上线的情况下,控制器将不会为分区重新选举主副本。对于副本状态机,代理节点下线时,副本的状态是下线;代理节点上线时,控制器都要处理副本状态从下线到上线的状态转换事件。

初始化状态机

控制器在故障转移时,会分别启动副本状态机和分区状态机,并根据控制器上下文初始化分区和副本的状态。

启动副本状态机时,只针对存活的副本,触发上线到上线的状态转换。如果不存活,状态为删除失败。

启动分区状态机时,将初始化状态为新建状态和下线状态的分区转为上线状态,这时会通过分区选举器为分区选出一个主副本。

来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:启动状态机时的初始化和状态转换

kafka控制器管理操作的步骤:

1,控制器在与管理操作相关的ZK节点上,注册不同事件类型的监听器。

2,管理操作往ZK节点添加数据,触发监听器对注册的事件作出响应。

3,监听器的事件处理方法最终会调用控制器定义的处理方法。

删除主题

删除主题相关的状态有:开始删除,删除成功,删除失败,不存在。

状态的约束条件:

1,开始删除的前置条件只能是下线状态,不存在的前置条件只能是删除成功。

2,开始删除可以到删除成功和删除失败,不能直接从下线状态到删除成功和删除失败。

3,删除失败可以回到下线,但是不能自己到开始删除,不存在或者删除失败。

4,副本状态转为下线或者开始删除,控制器都会发送StopReplica请求给代理节点。

当副本状态转换为删除失败时,删除主题的动作是无效的。以下情况删除主题动作是无效的:

1,副本所在的代理节点挂掉了。

2,主题正在执行重新分配分区。

3,主题正在执行最优副本选举。

与之相反就可以执行删除主题的操作。

删除主题的流程

先在/admin/delete_topics/下新建要删除的主题。删除主题的监听器监听到ZK节点的改变事件,通过删除主题管理器,删除主题的线程执行删除主题的处理。删除主题时,需要查询副本状态机的状态,只有允许删除时,删除主题的线程才会调用回调方法。

删除主题监听器触发时,将需要删除的的主题加入管理器的“待删除集合”,只有主题被成功删除,它才会从“待删除集合”中移除。如果删除未开始,未完成或删除失败,主题都会一直存在于“待删除集合”中。管理器允许删除主题,必须同时满足三个条件:

1,主题还没有删除完成,还在待删除集合中。

2,还没有开始删除主题,既不存在任何一个副本的状态是开始删除。

3,删除主题有效,不在无效的主题集合中。

当副本状态机中的所有副本的状态是删除成功时,删除主题的整个过程就算完成了。主题删除完成时,删除主题线程会执行清理工作:

1,主题的所有副本状态转换为“不存在”。

2,主题的所有分区状态转换为“下线”。

3,主题的所有分区状态转换为“不存在”。

4,将主题从管理器的集合中移除。

5,删除ZK节点/brokers/topics/[topic]。

6,删除ZK节点/admin/delete_topics/[topic]。

7,删除上下问对象中与主题相关的所有数据。

删除主题会将这个主题在各个代理节点上的副本全部删除。步骤如下:

1,将挂掉的副本(代理节点宕机)的状态转换为“删除失败”,主题只要有一个副本状态为删除失败,删除线程会重试删除操作。

2,将挂掉的副本对应的主题加入无效集合,只有删除主题是有效的,才允许被删除线程删除。

3,将删除的副本状态转换为下线,控制器发送不带删除标记的StopReplica请求给代理节点。

4,将删除的副本状态转为“开始删除”,控制器发送带有删除标记的StopReplica请求给代理节点。

来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:控制器删除主题,发送StopReplica请求给代理节点

删除主题管理器在处理StopReplica的回调方法中,如果响应结果有错误,副本状态转化为“删除失败”,对应的主题会加入“删除无效集合”。

副本状态为“删除失败”的原因:1,代理节点挂掉,副本对应的主题需要删除。2,开始删除主题的副本时,待删除的副本不在活动的副本集中。3,处理停止副本响应的回调方法中,状态码有错误。

停止副本

副本状态机处理副本转为“下线”“开始删除”的状态转换事件,都会向当前副本发送“停止副本”的请求(StopReplica)。副本状态转为“下线”时,如果副本在分区的ISR集合中,控制器会先将下线的副本从分区的ISR集合中移除,更新ZK节点中分区的ISR。副本状态从“下线”转为“开始删除”,并不需要执行从ISR集合移除的操作。控制器向状态为下线或者开始删除的副本发送StopReplica请求,这些副本所在的副本管理器会先停止分区的拉取线程,在开始删除状态下需要删除分区,副本管理器通过分区对象删除副本对应的本地日志文件。

停止副本参与的组件:ZK节点,控制器,状态机,副本管理器。

重新分配分区

重新分配分区指的是将分区的副本重新分配到不同的代理节点上。如果ZK节点中分区的新副本集合和当前分区的副本集合相同,这个分区就不需要重新分配了。重新分配分区的触发条件是ZK节点/admin/reassign_partitions修改了数据,对应的监听器(PartitionReassignedListener)注册了数据改变的监听事件。控制器上下文中partitionsBeingReassigned变量保存被分配的分区信息。

OAR:分区的原始副本集合;RAR:重新分配的副本集合;OAR-RAR:在OAR中但不在RAR中;RAR-OAR:在RAR中但不在OAR中。

重新分配分区过程中,RAR不会变化,AR和ISR会变化。从ZK中读取AR和ISR在不同阶段的数据是不同的。

重新分配分区流程

根据OAR和RAR是否有重叠,重新分配分区有两种情况:

1,OAR和RAR数据没有重叠,则OAR中的副本都会下线,RAR中的副本都会上线。重新分配分区并不会保留旧的副本,而是以新的副本全部替换所有旧的副本。

2,RAR中的副本有一部分在OAR中,在OAR中的已有副本状态不变。

重新分配分区之前,分区的AR和ISR等于OAR,重新分配分区之后,分区的AR和ISR等于RAR。RAR要能够作为分区的ISR,RAR里面的每个副本都需要赶上分区原来的主副本。RAR要先加入分区的AR中,然后在确保RAR都加入ISR后,才将OAR分别从AR和ISR中移除,才能保证重新分配分区后,分区的AR和ISR等于RAR。步骤:

1,将RAR加入AR,AR=OAR+RAR。

2,将RAR加入ISR,ISR=OAR+RAR。

3,ISR不变,分区的主副本从RAR中选举。

4,将OAR从ISR中移除,ISR=RAR。

5,将OAR从AR中移除,AR=RAR。

如果OAR与RAR有叠加,那么步骤4和5不会全部移除OAR,只会移除RAR-OAR的副本。增加和修改AR和ISR的顺序是:增加AR-->增加ISR-->减少ISR-->减少AR。因为副本在AR中,不一定在ISR中,而副本在ISR中一定在AR中。如果顺序打乱就会出现在ISR中,不在AR中。

管理监听器在执行“重新分配分区”之前,会在分区对应的状态节点(/topics/[topic]/[partition]/state)上注册ReassignedPartitionIsrChangeListener监听器。因为RAR的所有副本都要和分区旧的主副本同步,并加入到ISR中,分区的数据同步过程比较慢。如果想要一次性执行完成分区的重新分配,这个过程会持续比较长的时间。通过在状态节点上注册“分区ISR改变的监听器”,整个重新分配分区的工作会被分成两个阶段。第一个阶段是通过管理器操作的监听器,第二个阶段通过分区状态的监听器。步骤:

1,将ZK中的AR更新为OAR+RAR,也更新上下文中变量partitionAR为OAR+RAR。

2,发送LeaderAndIsr请求给AR集合的每个副本(OAR+RAR)。

3,将RAR-OAR的新副本转换为“新建状态”。

以上都是有PartitionReassignedListener监听器触发

4,等待RAR的所有副本都与旧的主副本同步,并加入ISR。

5,将RAR的所有副本状态转换为“上线状态”。

6,将上下文中的AR设置成RAR,在这之前,上下文的AR等于OAR+RAR。

7,如果主副本不在RAR,从RAR中选举新的主副本,并发送LeaderAndIsr请求给RAR。

8,将OAR-RAR的旧副本转换为下线状态,更新ZK的ISR,发送StopReplica请求。

9,将OAR-RAR的旧副本转换为不存在状态,并将副本从磁盘上删除。

10,将ZK中的AR设置为RAR。

11,删除/admin/reassign_partitions节点中分区对应的数据。

12,选举新主副本后,AR和ISR都变化,重新发送UpdateMetadata请求。

以上是ReassignedPartitionIsrChangeListener监听器触发。

分区状态节点上注册的ISR改变监听器会确保只有在分区的ISR包含所有的新副本,才会再次调用控制器的分区分配方法。

重新选举主副本

在“重新分配分区的第一阶段” 分区的AR更新为OAR+RAR,并且新的副本(RAR-OAR)状态会转为“新建”。控制器会发送LeaderAndIsr请求给新副本。新副本只会作为分区的备份副本,并同步主副本(OAR中旧的主副本)的数据。当新副本赶上主副本,就会被加入到ISR中 。当RAR中的所有副本都加入到ISR中,就会进入“重新分配分区的第二个阶段”。进入第二阶段后,RAR所有副本的状态转为“上线”,然后分区的AR更新为RAR,并且从RAR中选举主副本 。如果OAR中的主副本在RAR中, 就不需要重新选举主副本。如果OAR中的主副本不在RAR中,或者在RAR中,但是副本没有存活,都需要通过选举器,重新在RAR中选举第一个副本作为分区新的主副本。保证重新分配分区后,分区的主副本一定会在RAR中。

重新分配分区的不同阶段需要改变的数据有:上下文的分区信息,ZK节点分区的AR信息(/brokers/topics/[topic]),ZK节点的主副本于ISR信息。分区的AR与ISR都会从OAR依次转化为OAR+RAR,RAR,分区的主副本从OAR转到RAR中。

来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:重新分配分区过程中,更新上下文和ZK节点的数据

当分区的主副本信息发生变化,控制器需要发送LeaderAndIsr请求给分区副本对应的代理节点。

1,控制器选举出主副本为1,发送LeaderAndIsr请求:P1:{leader:1,isr:[1,2,3],ar:[1,2,3]},给分区的所有副本,即发送给1,2,3三个代理节点。

2,代理节点3宕机,控制器将副本3从分区的ISR中移除,并发送LeaderAndIsr请求:P1:{leader:1,isr:[1,2],ar:[1,2,3]}给分区存活的副本,即发送给1,2三个代理节点。

3,代理节点1宕机,控制器选举新的主副本为2,并发送LeaderAndIsr请求:P1:{leader:2,isr:[2,3],ar:[1,2,3]}给分区存活的副本,即发送给2,3三个代理节点。

控制器的网络通道管理器

控制器发送请求给代理节点,首先需要建立和服务端的目标代理节点的网络连接。在更改代理节点的监听器中,控制器在处理代理节点的上下线事件之前,直接更新控制器的网络通道管理器:对于需要新增的代理节点,创建控制器到新代理节点的网络连接;对于需要删除的代理节点,取消控制器到旧代理节点的网络连接。

控制器的网络通道管理器用brokerStateInfo保存了代理节点编号(brokerId)到代理节点状态(ControllerBrokerStateInfo)的映射关系。代理节点状态包括:网络连接对象(NetworkClient),请求队列(BlockingQueue),请求发送线程(RequestSendThread)。

控制器在发送请求之前,会先组织好属于每个目标节点的请求,最后在所有需要的请求都准备完毕时,才向目标节点发送批量请求。控制器发送给代理节点的请求有三种类型:分区信息(LeaderAndIsr),停止副本(StopReplica),更新元数据(UpdateMetadata)。

控制器使用批量请求会有三个步骤:

1,batch.newBatch():创建新的批量请求,必须确保前一批的请求全部发送完毕。

2,batch.addXXXXXX():为目标代理节点添加请求。

3,batch.sendRequestsToBrokers():根据步骤2的数据将请求发送给多个代理节点。

“分区存在主副本”的逻辑在控制器中完成:控制器为分区选举主副本,更新分区状态机为“上线状态”,并发送LeaderAndIsr请求给分区的每个副本。每个副本所在的代理节点处理LeaderAndIsr请求,最后必须确定分区在当前代理节点上是主副本还是备份副本 。

1,管理员创建主题,为主题分配分区,每个分区可以有多个副本,分布在不同的代理节点上 。

2,控制器为分区选举主副本,更新分区状态机中分区的状态为“上线” 。

3,控制器发送LeaderAndIsr请求给分区的每个副本,分区的状态信息包括主副本,ISR,AR。

4,每个代理节点的副本管理器确定“分区在当前代理节点上是主副本还是备份副本” 。

5,生产者开始追加消息到分区的主副本;消费者或备份副本向分区的主副本拉取消息 。

6,当分区的主副本发生变化(比如节点若机),如果分区的主副本不可用,与主副本相关的客户端操作都会失败。控制器为分区重新选举主副本后,还要再次发送LeaderAndIsr请求给存活的副本 。

7,副本管理器处理新的LeaderAndIsr请求,更新成为分区的主副本或备份副本(原先的主副本可能要转为备份副本,比如发生选举最优副本。而原先的备份副本可能会转为主副本,比如发生主副本下线)。

参考资料:

Kafka技术内幕:图文详解Kafka源码设计与实现

上一篇下一篇

猜你喜欢

热点阅读