Kafka收藏Java

字节终面:说说Kakfa副本状态机的实现原理?

2022-01-11  本文已影响0人  废柴程序员

读这源码有何用?

ReplicaStateMachine是内部组件,一般用户感觉不到存在,但搞懂它,对从根本定位一些数据不一致问题大有裨益。

部署3-Broker(A、B和C)Kafka集群,版本2.0.0。在这3个Broker上创建一个单分区、双副本主题。

当关闭A、B后,zk会显示该主题的Leader是-1,ISR为空

依次关闭A、B,该主题在zk中的Leader和ISR就变成B。和上一case不符

虽非特严重问题,但毕竟是数据不一致,查看源码后,定位导致不一致原因:

不阅读这部分源码,就无法定位<typo id="typo-455" data-origin="问题" ignoretag="true">问题</typo>根因。

定义与初始化

image.png image.png image.png

ReplicaState:副本状态集合,Kafka目前共定义了7种副本状态。

ReplicaStateMachine只需接收一个ControllerContext对象实例,ControllerContext封装了Controller端保存的所有集群元数据信息。

构造一个ZKReplicaStateMachine实例,除了ControllerContext实例,比较重要的属性还有:

负责与ZooKeeper进行交互

用于给集群Broker发送控制类请求(LeaderAndIsrRequest、StopReplicaRequest和UpdateMetadataRequest)

ControllerBrokerRequestBatch,将给定Request发送给指定Broker,它是如何发送请求的呢(结合ControllerBrokerStateInfo)

在副本状态转换操作的逻辑中,关键是为Broker上的副本更新信息,而这是通过Controller给Broker发送请求实现的,因此,你最好了解下这里的请求发送逻辑。

副本状态<typo id="typo-1162" data-origin="机" ignoretag="true">机</typo>是在何时进行初始化的?

KafkaController对象在构建时,就会初始化一个ZkReplicaStateMachine实例

image.png

Yes!所有Broker在启动时,都会创建KafkaController实例,也随之创建ZKReplicaStateMachine实例。但只有在Controller所在的Broker,副本状态机才会被启动:

image.png

当Broker被成功推举为Controller后,onControllerFailover方法会被调用,进而启动该Broker早已创建好的副本状态机和分区状态机。

副本状态及状态管理流程

副本状态<typo id="typo-1493" data-origin="机" ignoretag="true">机</typo>一旦被启动,就要管理副本状态的转换

研究管理状态前,要先明白:

源码中的ReplicaState定义了如下副本状态:

image.png

ReplicaState接口及其实现对象定义了每种状态的序号,以及合法的前置状态。以OnlineReplica为例:

image.png image.png

其validPreviousStates属性是个集合类型,说明Kafka只允许副本从这4种态变更到OnlineReplica态。

其余副本状态的代码逻辑类似,关注validPreviousStates字段即可知晓每个状态合法的前置状态。

最终完整的状态转换规则:

image.png

状态管理流程

一旦开启如删除主题这样操作,状态机会将副本状态跳转到ReplicaDeletionStarted,表明副本删除已开启:

当副本对象被删除后,其状态变更为NonExistentReplica,副本状态机将移除该副本数据。

具体实现类:ZkReplicaStateMachine

副本状态<typo id="typo-2199" data-origin="机" ignoretag="true">机</typo>的具体实现类。

状态转换方法

image.png

handleStateChanges方法

handleStateChange处理状态的变更,对外提供状态转换操作的入口方法:

def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit
image.png
  1. 调用doHandleStateChanges执行副本状态转换
  2. 给集群中相应Broker批量发送请求

执行第1步时,会将replicas按Broker ID分组。<主题名,分区号,副本Broker ID>表示副本对象

image.png

假设replicas为集合:

<test, 0, 0>

<test, 0, 1>

<test, 1, 0>

<test, 1, 1>)

则调用doHandleStateChanges前,会将replicas按Broker ID分组成:

Map(

- 0 -> Set(<test, 0, 0>, <test, 1, 0>),

- 1 -> Set(<test, 0, 1>, <test, 1, 1>)

)

之后调用doHandleStateChanges

doHandleStateChanges

image.png

1.能合法转换的<typo id="typo-3076" data-origin="副本" ignoretag="true">副本</typo>对象集合

2.执行非法状态转换的副本对象集合

doHandleStateChanges为该集合类的每个副本对象记录一条错误日志

包括:

分支1:转换到NewReplica

image.png image.png

尝试从元数据缓存中,获取这些副本对象的分区信息数据,包括分区的Leader副本在哪个Broker,ISR中都有哪些副本等。

若找不到对应分区数据,直接把副本状态更新为NewReplica。否则,代码就要给该副本所在Broker发送请求,让它知道该分区的信息。还要给集群所有运行中的Broker发送请求,让它们感知到新副本加入。

分支2:转换到OnlineReplica态

副本对象正常工作时所处状态:

image.png image.png

遍历副本对象,依次执行:

分支3:转换到OfflineReplica状态

image.png image.png

把副本状态变更为OfflineReplica=停止对应副本+更新远端Broker元数据

总结

Kafka的副本状态机实现原理及源码:

上一篇下一篇

猜你喜欢

热点阅读