14、AbstractFetcherThread:拉取消息分几步
我们来正式认识下 AbstractFetcherThread 吧。它的源码位于 server 包下的 AbstractFetcherThread.scala 文件中。从名字来看,它是一个抽象类,实现的功能是从 Broker 获取多个分区的消息数据,至于获取之后如何对这些数据进行处理,则交由子类来 实现。
abstract class AbstractFetcherThread(name: String,
clientId: String,
val sourceBroker: BrokerEndPoint,
failedPartitions: FailedPartitions,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true,
val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
extends ShutdownableThread(name, isInterruptible) {
我们来看一下 AbstractFetcherThread 的构造函数接收的几个重要参数的含义。
- name: 线程名字。
- sourceBroker: 源 Broker 节点信息。源 Broker 是指此线程要从哪个 Broker 上读取数 据。
- failedPartitions: 线程处理过程报错的分区集合。
- fetchBackOffMs: 当获取分区数据出错后的等待重试间隔,默认是 Broker 端参数 replica.fetch.backoff.ms 值。
- brokerTopicStats: Broker 端主题的各类监控指标,常见的有 MessagesInPerSec、 BytesInPerSec 等。
这些字段中比较重要的是 sourceBroker,因为它决定 Follower 副本从哪个 Broker 拉取 数据,也就是 Leader 副本所在的 Broker 是哪台。
分区读取状态类
AbstractFetcherThread 类封装了一个名为 PartitionStates[PartitionFetchState]类型的字段。
注意这里的状态有两个,一个是分区读取状态,一个是副本读取状态。
sealed trait ReplicaState
// 截断中
case object Truncating extends ReplicaState
// 获取中
case object Fetching extends ReplicaState
可见,副本读取状态有截断中和获取中两个:当副本执行截断操作时,副本状态被设置成 Truncating;当副本被读取时,副本状态被设置成 Fetching。
而分区读取状态有 3 个,分别是:
- 可获取,表明副本获取线程当前能够读取数据。
- 截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。
- 被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。
PartitionStates 类用轮询的方式来处理要读取的多个分区。那具体是怎么实 现的呢?简单来说,就是依靠 LinkedHashMap 数据结构来保存所有主题分区。 LinkedHashMap 中的元素有明确的迭代顺序,通常就是元素被插入的顺序。
假设 Kafka 要读取 5 个分区上的消息:A、B、C、D 和 E。如果插入顺序就是 ABCDE,那 么自然首先读取分区 A。一旦 A 被读取之后,为了确保各个分区都有同等机会被读取到, 代码需要将 A 插入到分区列表的最后一位,这就是 updateAndMoveToEnd 方法要做的事 情。
具体来说,就是把 A 从 map 中移除掉,然后再插回去,这样 A 自然就处于列表的最后一 位了。大体上,PartitionStates 类就是做这个用的。
processPartitionData方法
processPartitionData用于处理读取回来的消息集合。它是一个 抽象方法,因此需要子类实现它的逻辑。具体到 Follower 副本而言, 是由 ReplicaFetcherThread 类实现的。以下是它的方法签名:
protected def processPartitionData(
topicPartition: TopicPartition,// 读取哪个分区的数据
fetchOffset: Long,// 读取到的最新位移值
partitionData: FetchData): // 读取到的分区消息数据
Option[LogAppendInfo]// 写入已读取消息数据前的元数据
总结
1、AbstractFetcherThread 类:拉取线程的抽象基类。它定义了公共方法来处理所有拉取 线程都要实现的逻辑,如执行截断操作,获取消息等。
2、拉取线程逻辑:循环执行截断操作和获取数据操作。
3、分区读取状态:当前,源码定义了 3 类分区读取状态。拉取线程只能拉取处于可读取状态的分区的数据。
