14、AbstractFetcherThread:拉取消息分几步

2020-10-09  本文已影响0人  技术灭霸

我们来正式认识下 AbstractFetcherThread 吧。它的源码位于 server 包下的 AbstractFetcherThread.scala 文件中。从名字来看,它是一个抽象类,实现的功能是从 Broker 获取多个分区的消息数据,至于获取之后如何对这些数据进行处理,则交由子类来 实现。

abstract class AbstractFetcherThread(name: String,
                                     clientId: String,
                                     val sourceBroker: BrokerEndPoint,
                                     failedPartitions: FailedPartitions,
                                     fetchBackOffMs: Int = 0,
                                     isInterruptible: Boolean = true,
                                     val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
  extends ShutdownableThread(name, isInterruptible) {

我们来看一下 AbstractFetcherThread 的构造函数接收的几个重要参数的含义。

这些字段中比较重要的是 sourceBroker,因为它决定 Follower 副本从哪个 Broker 拉取 数据,也就是 Leader 副本所在的 Broker 是哪台。

分区读取状态类

AbstractFetcherThread 类封装了一个名为 PartitionStates[PartitionFetchState]类型的字段。

注意这里的状态有两个,一个是分区读取状态,一个是副本读取状态。

sealed trait ReplicaState
// 截断中
case object Truncating extends ReplicaState
// 获取中
case object Fetching extends ReplicaState

可见,副本读取状态有截断中和获取中两个:当副本执行截断操作时,副本状态被设置成 Truncating;当副本被读取时,副本状态被设置成 Fetching。

而分区读取状态有 3 个,分别是:

  1. 可获取,表明副本获取线程当前能够读取数据。
  2. 截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。
  3. 被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。

PartitionStates 类用轮询的方式来处理要读取的多个分区。那具体是怎么实 现的呢?简单来说,就是依靠 LinkedHashMap 数据结构来保存所有主题分区。 LinkedHashMap 中的元素有明确的迭代顺序,通常就是元素被插入的顺序。

假设 Kafka 要读取 5 个分区上的消息:A、B、C、D 和 E。如果插入顺序就是 ABCDE,那 么自然首先读取分区 A。一旦 A 被读取之后,为了确保各个分区都有同等机会被读取到, 代码需要将 A 插入到分区列表的最后一位,这就是 updateAndMoveToEnd 方法要做的事 情。

具体来说,就是把 A 从 map 中移除掉,然后再插回去,这样 A 自然就处于列表的最后一 位了。大体上,PartitionStates 类就是做这个用的。

processPartitionData方法

processPartitionData用于处理读取回来的消息集合。它是一个 抽象方法,因此需要子类实现它的逻辑。具体到 Follower 副本而言, 是由 ReplicaFetcherThread 类实现的。以下是它的方法签名:

  protected def processPartitionData(
                                     topicPartition: TopicPartition,// 读取哪个分区的数据
                                     fetchOffset: Long,// 读取到的最新位移值
                                     partitionData: FetchData):     // 读取到的分区消息数据                                                                     
                                     Option[LogAppendInfo]// 写入已读取消息数据前的元数据

总结

1、AbstractFetcherThread 类:拉取线程的抽象基类。它定义了公共方法来处理所有拉取 线程都要实现的逻辑,如执行截断操作,获取消息等。
2、拉取线程逻辑:循环执行截断操作和获取数据操作。
3、分区读取状态:当前,源码定义了 3 类分区读取状态。拉取线程只能拉取处于可读取状态的分区的数据。


上一篇 下一篇

猜你喜欢

热点阅读