Flink 源码之 KafkaSource
Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation提出了新的Source架构。该新架构的分析请参见Flink 源码之新 Source 架构。针对这个新架构,Flink社区新推出了新的Kafka connector - KafkaSource
。老版本的实现FlinkKafkaConsumer
目前被标记为Deprecated,不再推荐使用。本篇展开KafkaSource
的源代码分析。
本篇包含4个部分的源代码分析:
- KafkaSource创建
- 数据读取
- 分区发现
- checkpoint
KafkaSource创建
如官网所示,编写Flink消费Kafka场景应用,我们可以按照如下方式创建KafkaSource:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
env.fromSource生成了一个DataStreamSource
。DataStreamSource
对应了SourceTransformation
,然后经过SourceTransformationTranslator
翻译成StreamGraph
的Source
节点,执行的时候对应的是SourceOperator
。SourceOperator
是新Source API对应的Operator。它直接和SourceReader交互。调用sourceReader.pollNext
方法拉取数据。这一连串逻辑与Kafka关系不大,不再展开介绍,了解即可。
最终,KafkaSourceBuilder
按照我们配置的参数,返回符合要求的kafkaSource
对象。
public KafkaSource<OUT> build() {
sanityCheck();
parseAndSetRequiredProperties();
return new KafkaSource<>(
subscriber,
startingOffsetsInitializer,
stoppingOffsetsInitializer,
boundedness,
deserializationSchema,
props);
}
KafkaSource
的createReader
方法生成KafkaSourceReader
。代码如下:
@Internal
@Override
public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
throws Exception {
return createReader(readerContext, (ignore) -> {});
}
@VisibleForTesting
SourceReader<OUT, KafkaPartitionSplit> createReader(
SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
throws Exception {
// elementQueue用来存放从fetcher获取到的ConsumerRecord
// reader从elementQueue读取缓存的Kafka消息
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue = new FutureCompletingBlockingQueue<>();
// 初始化deserializationSchema
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup().addGroup("deserializer");
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
// 创建Kafka数据源监控
final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(readerContext.metricGroup());
// 创建一个工厂方法,用于创建KafkaPartitionSplitReader。它按照分区读取Kafka消息
Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
() -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
elementsQueue, splitReaderSupplier::get, splitFinishedHook),
recordEmitter,
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics);
}
数据读取流程
KafkaSourceFetcherManager
继承了SingleThreadFetcherManager
。当发现数据分片的时候,它会获取已有的SplitFetcher
,将split指派给它。如果没有正在运行的fetcher,创建一个新的。
@Override
// 发现新的分片的时候调用这个方法
// 将分片指派给fetcher
public void addSplits(List<SplitT> splitsToAdd) {
SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
if (fetcher == null) {
fetcher = createSplitFetcher();
// Add the splits to the fetchers.
fetcher.addSplits(splitsToAdd);
startFetcher(fetcher);
} else {
fetcher.addSplits(splitsToAdd);
}
}
然后我们分析fetcher如何拉取数据的。上面的startFetcher
方法启动SplitFetcher
线程。
protected void startFetcher(SplitFetcher<E, SplitT> fetcher) {
executors.submit(fetcher);
}
SplitFetcher
用于执行从外部系统拉取数据的任务,它一直循环运行SplitFetchTask
。SplitFetchTask
有多个子类:
- AddSplitTask: 为reader指派split的任务
- PauseOrResumeSplitsTask: 暂停或恢复Split读取的任务
- FetchTask: 拉取数据到elemeQueue中
接下来分析SplitFetcher
的run
方法:
@Override
public void run() {
LOG.info("Starting split fetcher {}", id);
try {
// 循环执行runOnce方法
while (runOnce()) {
// nothing to do, everything is inside #runOnce.
}
} catch (Throwable t) {
errorHandler.accept(t);
} finally {
try {
splitReader.close();
} catch (Exception e) {
errorHandler.accept(e);
} finally {
LOG.info("Split fetcher {} exited.", id);
// This executes after possible errorHandler.accept(t). If these operations bear
// a happens-before relation, then we can checking side effect of
// errorHandler.accept(t)
// to know whether it happened after observing side effect of shutdownHook.run().
shutdownHook.run();
}
}
}
boolean runOnce() {
// first blocking call = get next task. blocks only if there are no active splits and queued
// tasks.
SplitFetcherTask task;
lock.lock();
try {
if (closed) {
return false;
}
// 重要逻辑在此
// 这里从taskQueue中获取一个任务
// 如果队列中有积压的任务,优先运行之
// 如果taskQueue为空,检查是否有已分配的split,如果有的话返回一个FetchTask
// FetchTask在SplitFetcher构造e时候被创建出来
task = getNextTaskUnsafe();
if (task == null) {
// (spurious) wakeup, so just repeat
return true;
}
LOG.debug("Prepare to run {}", task);
// store task for #wakeUp
this.runningTask = task;
} finally {
lock.unlock();
}
// execute the task outside of lock, so that it can be woken up
boolean taskFinished;
try {
// 执行task的run方法
taskFinished = task.run();
} catch (Exception e) {
throw new RuntimeException(
String.format(
"SplitFetcher thread %d received unexpected exception while polling the records",
id),
e);
}
// re-acquire lock as all post-processing steps, need it
lock.lock();
try {
this.runningTask = null;
processTaskResultUnsafe(task, taskFinished);
} finally {
lock.unlock();
}
return true;
}
用来拉取数据的SplitFetchTask
子类为FetchTask
。它的run
方法代码如下所示:
@Override
public boolean run() throws IOException {
try {
// 在wakeup状态会跳过这一轮执行
if (!isWakenUp() && lastRecords == null) {
// 调用splitReader从split拉取一批数据
lastRecords = splitReader.fetch();
}
if (!isWakenUp()) {
// The order matters here. We must first put the last records into the queue.
// This ensures the handling of the fetched records is atomic to wakeup.
// 将读取到的数据放入到elementQueue中
if (elementsQueue.put(fetcherIndex, lastRecords)) {
//如果有已经读取完的split
if (!lastRecords.finishedSplits().isEmpty()) {
// The callback does not throw InterruptedException.
// 调用读取完成回调函数
splitFinishedCallback.accept(lastRecords.finishedSplits());
}
lastRecords = null;
}
}
} catch (InterruptedException e) {
// this should only happen on shutdown
throw new IOException("Source fetch execution was interrupted", e);
} finally {
// clean up the potential wakeup effect. It is possible that the fetcher is waken up
// after the clean up. In that case, either the wakeup flag will be set or the
// running thread will be interrupted. The next invocation of run() will see that and
// just skip.
if (isWakenUp()) {
wakeup = false;
}
}
// The return value of fetch task does not matter.
return true;
}
上面代码片段中splitReader.fetch()
对应的是KafkaPartitionSplitReader
的fetch
方法。
@Override
public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
ConsumerRecords<byte[], byte[]> consumerRecords;
try {
// 调用KafkaConsumer拉取一批消息,超时时间为10s
consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
} catch (WakeupException | IllegalStateException e) {
// IllegalStateException will be thrown if the consumer is not assigned any partitions.
// This happens if all assigned partitions are invalid or empty (starting offset >=
// stopping offset). We just mark empty partitions as finished and return an empty
// record container, and this consumer will be closed by SplitFetcherManager.
// 如注释所说,如果consumer没有指定消费的partition,会抛出IllegalStateException
// 所有已分配的partition无效或者是为空(起始offset >= 停止offset)的时候也会出现这种情况
// 返回空的KafkaPartitionSplitRecords,并且标记分区已完成
// 这个consumer稍后会被SplitFetcherManager关闭
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(
ConsumerRecords.empty(), kafkaSourceReaderMetrics);
markEmptySplitsAsFinished(recordsBySplits);
return recordsBySplits;
}
// 将consumerRecords包装在KafkaPartitionSplitRecords中返回
// KafkaPartitionSplitRecords具有pattition和record两个iterator
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics);
List<TopicPartition> finishedPartitions = new ArrayList<>();
// 遍历consumerRecords中的partition
for (TopicPartition tp : consumerRecords.partitions()) {
// 获取分区停止offset
long stoppingOffset = getStoppingOffset(tp);
// 读取这个partition的所有数据
final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
consumerRecords.records(tp);
// 如果读取到数据
if (recordsFromPartition.size() > 0) {
// 获取该分区最后一条读取到的数据
final ConsumerRecord<byte[], byte[]> lastRecord =
recordsFromPartition.get(recordsFromPartition.size() - 1);
// After processing a record with offset of "stoppingOffset - 1", the split reader
// should not continue fetching because the record with stoppingOffset may not
// exist. Keep polling will just block forever.
// 如果最后一条消息的offset大于等于stoppingOffset
// stopping使用consumer的endOffsets方法获取,
// 设置recordsBySplits的结束offset
// 然后标记这个split为已完成
if (lastRecord.offset() >= stoppingOffset - 1) {
recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
}
// Track this partition's record lag if it never appears before
// 添加kafka记录延迟监控
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
}
// 将空的split标记为已完成的split
markEmptySplitsAsFinished(recordsBySplits);
// Unassign the partitions that has finished.
// 不再记录已完成分区记录的延迟
// 取消分配这些分区
if (!finishedPartitions.isEmpty()) {
finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
unassignPartitions(finishedPartitions);
}
// Update numBytesIn
// 更新已读取的字节数监控数值
kafkaSourceReaderMetrics.updateNumBytesInCounter();
return recordsBySplits;
}
到这里为止,我们分析完了从KafkaConsumer读取消息到并放置到ElementQueue的逻辑。接下来是Flink内部将ElementQueue中的数据读取出来并发送到下游的过程。
SourceReaderBase
将数据从elementQueue
中读出然后交给recordEmitter
。
SourceReaderBase
的getNextFetch
方法内容如下:
@Nullable
private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {
splitFetcherManager.checkErrors();
LOG.trace("Getting next source data batch from queue");
// 从elementQueue中拿出一批数据
final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
// 如果当前split没有读取到数据,并且没有下一个split,返回null
if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
// No element available, set to available later if needed.
return null;
}
currentFetch = recordsWithSplitId;
return recordsWithSplitId;
}
getNextFetch
这个方法在pollNext
中调用。SourceOperator
调用reader的pollNext
方法,不断拉取数据发送交给recordEmitter发送给下游。
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
// make sure we have a fetch we are working on, or move to the next
RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
if (recordsWithSplitId == null) {
recordsWithSplitId = getNextFetch(output);
if (recordsWithSplitId == null) {
return trace(finishedOrAvailableLater());
}
}
// we need to loop here, because we may have to go across splits
while (true) {
// Process one record.
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
numRecordsInCounter.inc(1);
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
// The fetch is done and we just discovered that and have not emitted anything, yet.
// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
// rather than emitting nothing and waiting for the caller to call us again.
return pollNext(output);
}
}
}
最后我们一路分析到KafkaRecordEmitter
的emitRecord
方法。它把接收到的kafka消息逐条反序列化之后,发送给下游output。接着传递给下游算子。
@Override
public void emitRecord(
ConsumerRecord<byte[], byte[]> consumerRecord,
SourceOutput<T> output,
KafkaPartitionSplitState splitState)
throws Exception {
try {
sourceOutputWrapper.setSourceOutput(output);
sourceOutputWrapper.setTimestamp(consumerRecord.timestamp());
deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);
splitState.setCurrentOffset(consumerRecord.offset() + 1);
} catch (Exception e) {
throw new IOException("Failed to deserialize consumer record due to", e);
}
}
分区发现
Flink KafkaSource支持按照配置的规则(topic列表,topic正则表达式或者直接指定分区),以定时任务的形式周期扫描Kafka分区,从而实现Kafka分区动态发现。
KafkaSourceEnumerator
的start
方法创建出KafkaAdminClient
。然后根据partitionDiscoveryIntervalMs
(分区自动发现间隔时间),确定是否周期调用分区发现逻辑。
@Override
public void start() {
// 创建Kafka admin client
adminClient = getKafkaAdminClient();
// 如果配置了分区自动发现时间间隔
if (partitionDiscoveryIntervalMs > 0) {
LOG.info(
"Starting the KafkaSourceEnumerator for consumer group {} "
+ "with partition discovery interval of {} ms.",
consumerGroupId,
partitionDiscoveryIntervalMs);
// 周期调用getSubscribedTopicPartitions和checkPartitionChanges两个方法
context.callAsync(
this::getSubscribedTopicPartitions,
this::checkPartitionChanges,
0,
partitionDiscoveryIntervalMs);
} else {
// 否则只在启动的时候调用一次
LOG.info(
"Starting the KafkaSourceEnumerator for consumer group {} "
+ "without periodic partition discovery.",
consumerGroupId);
context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges);
}
}
getSubscribedTopicPartitions
方法:
private Set<TopicPartition> getSubscribedTopicPartitions() {
return subscriber.getSubscribedTopicPartitions(adminClient);
}
这个方法调用KafkaSubscriber
,根据配置的条件,获取订阅的partition。
KafkaSubscriber
具有3个子类,分别对应不同的分区发现规则:
- PartitionSetSubscriber: 通过
KafkaSourceBuilder
的setPartitions
方法创建,直接根据partition名称订阅内容。 - TopicListSubscriber: 根据topic列表获取订阅的partition。使用
KafkaSourceBuilder
的setTopics
可以订阅一系列的topic,使用的subscriber就是这个。 - TopicPatternSubscriber: 使用正则表达式匹配topic名称的方式获取订阅的partition。使用
KafkaSourceBuilder
的setTopicPattern
方法的时候会创建此subscriber。
接下来以TopicListSubscriber
为例,分析获取订阅partiton的逻辑。
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
LOG.debug("Fetching descriptions for topics: {}", topics);
// 使用admin client读取Kafka topic的元数据
// 包含指定topic对应的分区信息
final Map<String, TopicDescription> topicMetadata =
getTopicMetadata(adminClient, new HashSet<>(topics));
// 将各个分区的partition信息加入到subscribedPartitions集合,然后返回
Set<TopicPartition> subscribedPartitions = new HashSet<>();
for (TopicDescription topic : topicMetadata.values()) {
for (TopicPartitionInfo partition : topic.partitions()) {
subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition()));
}
}
return subscribedPartitions;
}
获取订阅分区的逻辑不是特别复杂,其他两个subscriber
的逻辑这里不再分析。
getSubscribedTopicPartitions
方法的返回值和异常(如果抛出异常的话)将会传递给checkPartitionChange
方法。它将检测分区信息是否发生变更。代码逻辑如下:
private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable t) {
if (t != null) {
throw new FlinkRuntimeException(
"Failed to list subscribed topic partitions due to ", t);
}
// 检测分区变更情况
final PartitionChange partitionChange = getPartitionChange(fetchedPartitions);
// 如果没有变更,直接返回
if (partitionChange.isEmpty()) {
return;
}
// 如果检测到变更,调用initializePartitionSplits和handlePartitionSplitChanges方法
context.callAsync(
() -> initializePartitionSplits(partitionChange),
this::handlePartitionSplitChanges);
}
@VisibleForTesting
PartitionChange getPartitionChange(Set<TopicPartition> fetchedPartitions) {
// 保存被移除的分区
final Set<TopicPartition> removedPartitions = new HashSet<>();
Consumer<TopicPartition> dedupOrMarkAsRemoved =
(tp) -> {
if (!fetchedPartitions.remove(tp)) {
removedPartitions.add(tp);
}
};
// 如果分区在assignedPartitions(已分配分区)存在,在fetchedPartitions中不存在,说明分区已经移出
// 将其加入到removedPartitions中
assignedPartitions.forEach(dedupOrMarkAsRemoved);
// pendingPartitionSplitAssignment为上轮发现但是还没有分配给reader读取的分区
// 从pendingPartitionSplitAssignment中找到被移除的分区
pendingPartitionSplitAssignment.forEach(
(reader, splits) ->
splits.forEach(
split -> dedupOrMarkAsRemoved.accept(split.getTopicPartition())));
// 如果fetchedPartitions还有分区没有remove掉,说明有新发现的分区
if (!fetchedPartitions.isEmpty()) {
LOG.info("Discovered new partitions: {}", fetchedPartitions);
}
if (!removedPartitions.isEmpty()) {
LOG.info("Discovered removed partitions: {}", removedPartitions);
}
// 包装新增加的分区和移除的分区到PartitionChange中返回
return new PartitionChange(fetchedPartitions, removedPartitions);
}
对比完新发现的分区和原本订阅的分区之后,接下来需要对这些变更做出响应。
initializePartitionSplits
方法将分区变更信息包装为PartitionSplitChange
。这个对象记录了新增加的分区和移除的分区。和PartitionChange
不同的是,PartitionSplitChange
包含的新增分区的类型为KafkaPartitionSplit
,它额外保存了分区的起始和终止offset。
private PartitionSplitChange initializePartitionSplits(PartitionChange partitionChange) {
// 获取新增的分区
Set<TopicPartition> newPartitions =
Collections.unmodifiableSet(partitionChange.getNewPartitions());
// 获取分区offset获取器
OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever = getOffsetsRetriever();
// 获取起始offset
Map<TopicPartition, Long> startingOffsets =
startingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);
// 获取终止offset
Map<TopicPartition, Long> stoppingOffsets =
stoppingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever);
Set<KafkaPartitionSplit> partitionSplits = new HashSet<>(newPartitions.size());
// 将每个分区对应的starting offset和stopping offset包装起来
for (TopicPartition tp : newPartitions) {
Long startingOffset = startingOffsets.get(tp);
long stoppingOffset =
stoppingOffsets.getOrDefault(tp, KafkaPartitionSplit.NO_STOPPING_OFFSET);
partitionSplits.add(new KafkaPartitionSplit(tp, startingOffset, stoppingOffset));
}
// 返回结果
return new PartitionSplitChange(partitionSplits, partitionChange.getRemovedPartitions());
}
上面的方法的关键逻辑是获取各个分区的起始offset(startingOffsetInitializer
)和终止offset(KafkaSourceBuilder
)。
startingOffsetInitializer
在KafkaSourceBuilder
中创建,默认为OffsetsInitializer.earliest()
。代码如下:
static OffsetsInitializer earliest() {
return new ReaderHandledOffsetsInitializer(
KafkaPartitionSplit.EARLIEST_OFFSET, OffsetResetStrategy.EARLIEST);
}
它创建出ReaderHandledOffsetsInitializer
对象,含义是对于所有新发现的topic,从它们最开头的地方开始读取。
ReaderHandledOffsetsInitializer
的getPartitionOffsets
方法代码内容如下。它将所有的分区offset设置为startingOffset
,结合前面的场景,即KafkaPartitionSplit.EARLIEST_OFFSET
。
@Override
public Map<TopicPartition, Long> getPartitionOffsets(
Collection<TopicPartition> partitions,
PartitionOffsetsRetriever partitionOffsetsRetriever) {
Map<TopicPartition, Long> initialOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
initialOffsets.put(tp, startingOffset);
}
return initialOffsets;
}
对于stoppingOffsetInitializer
,KafkaSourceBuilder
创建的默认为NoStoppingOffsetsInitializer
。含义为没有终止offset,针对unbounded(无界)kafka数据流。它的代码很少,这里就不再分析了。
我们回到应对分区变更的方法handlePartitionSplitChanges
。这个方法将新发现的分区分配给pending和已注册的reader。
private void handlePartitionSplitChanges(
PartitionSplitChange partitionSplitChange, Throwable t) {
if (t != null) {
throw new FlinkRuntimeException("Failed to initialize partition splits due to ", t);
}
if (partitionDiscoveryIntervalMs < 0) {
LOG.debug("Partition discovery is disabled.");
noMoreNewPartitionSplits = true;
}
// TODO: Handle removed partitions.
addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
assignPendingPartitionSplits(context.registeredReaders().keySet());
}
addPartitionSplitChangeToPendingAssignments
将分区加入到待读取(pending)集合中。
private void addPartitionSplitChangeToPendingAssignments(
Collection<KafkaPartitionSplit> newPartitionSplits) {
int numReaders = context.currentParallelism();
for (KafkaPartitionSplit split : newPartitionSplits) {
// 将这些分区均匀分配给所有的reader
int ownerReader = getSplitOwner(split.getTopicPartition(), numReaders);
pendingPartitionSplitAssignment
.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
}
LOG.debug(
"Assigned {} to {} readers of consumer group {}.",
newPartitionSplits,
numReaders,
consumerGroupId);
}
assignPendingPartitionSplits
方法分配分区给reader。它的逻辑分析如下:
private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
Map<Integer, List<KafkaPartitionSplit>> incrementalAssignment = new HashMap<>();
// Check if there's any pending splits for given readers
for (int pendingReader : pendingReaders) {
// 检查reader是否已在SourceCoordinator中注册
checkReaderRegistered(pendingReader);
// Remove pending assignment for the reader
// 获取这个reader对应的所有分配给它的分区,然后从pendingPartitionSplitAssignment中移除
final Set<KafkaPartitionSplit> pendingAssignmentForReader =
pendingPartitionSplitAssignment.remove(pendingReader);
// 如果有分配给这个reader的分区,将他们加入到incrementalAssignment中
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
// Put pending assignment into incremental assignment
incrementalAssignment
.computeIfAbsent(pendingReader, (ignored) -> new ArrayList<>())
.addAll(pendingAssignmentForReader);
// Mark pending partitions as already assigned
// 标记这些分区为已分配
pendingAssignmentForReader.forEach(
split -> assignedPartitions.add(split.getTopicPartition()));
}
}
// Assign pending splits to readers
// 将这些分区分配给reader
if (!incrementalAssignment.isEmpty()) {
LOG.info("Assigning splits to readers {}", incrementalAssignment);
context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
}
// If periodically partition discovery is disabled and the initializing discovery has done,
// signal NoMoreSplitsEvent to pending readers
// 如果没有新的分片(分区发现被关闭),并且设置为有界模式
// 给reader发送没有更多分片信号(signalNoMoreSplits)
if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
LOG.debug(
"No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}"
+ " in consumer group {}.",
pendingReaders,
consumerGroupId);
pendingReaders.forEach(context::signalNoMoreSplits);
}
}
调用assignPendingPartitionSplits
方法的地方有三处:
- addSplitsBack: 某个reader执行失败,在上次成功checkpoint之后分配给这个reader的split需要再添加回
SplitEnumerator
中。 - addReader: 增加新的reader。需要给新的reader分配split。
- handlePartitionSplitChanges: 上面介绍的检测到分区变更的时候,需要为reader分配新发现的分区。
接着我们关心的问题是这些分片是如何添加给SplitEnumerator
的。我们展开分析context.assignSplits
调用。这里的context
实现类为SourceCoordinatorContext
。继续分析SourceCoordinatorContext::assignSplits
方法代码:
@Override
public void assignSplits(SplitsAssignment<SplitT> assignment) {
// Ensure the split assignment is done by the coordinator executor.
// 在SourceCoordinator线程中调用
callInCoordinatorThread(
() -> {
// Ensure all the subtasks in the assignment have registered.
// 逐个检查需要分配的split所属的reader是否已注册过
assignment
.assignment()
.forEach(
(id, splits) -> {
if (!registeredReaders.containsKey(id)) {
throw new IllegalArgumentException(
String.format(
"Cannot assign splits %s to subtask %d because the subtask is not registered.",
splits, id));
}
});
// 记录已分配的assignment(加入到尚未checkpoint的assignment集合中)
assignmentTracker.recordSplitAssignment(assignment);
// 分配split
assignSplitsToAttempts(assignment);
return null;
},
String.format("Failed to assign splits %s due to ", assignment));
}
assignSplitsToAttempts
有好几个重载方法。一路跟随到最后,它创建出了AddSplitEvent
对象,通过OperatorCoordinator
发送这个事件给SourceOperator
。代码如下所示:
private void assignSplitsToAttempts(SplitsAssignment<SplitT> assignment) {
assignment.assignment().forEach((index, splits) -> assignSplitsToAttempts(index, splits));
}
private void assignSplitsToAttempts(int subtaskIndex, List<SplitT> splits) {
getRegisteredAttempts(subtaskIndex)
.forEach(attempt -> assignSplitsToAttempt(subtaskIndex, attempt, splits));
}
private void assignSplitsToAttempt(int subtaskIndex, int attemptNumber, List<SplitT> splits) {
if (splits.isEmpty()) {
return;
}
checkAttemptReaderReady(subtaskIndex, attemptNumber);
final AddSplitEvent<SplitT> addSplitEvent;
try {
// 创建AddSplitEvent(添加split事件)
addSplitEvent = new AddSplitEvent<>(splits, splitSerializer);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to serialize splits.", e);
}
final OperatorCoordinator.SubtaskGateway gateway =
subtaskGateways.getGatewayAndCheckReady(subtaskIndex, attemptNumber);
// 将事件发送给subtaskIndex对应的SourceOperator
gateway.sendEvent(addSplitEvent);
}
gateway.sendEvent() -> SourceOperator::handleOperatorEvent
网络通信之间的过程这里不再分析了。我们查看SourceOperator
接收event的方法handleOperatorEvent
,内容如下:
public void handleOperatorEvent(OperatorEvent event) {
if (event instanceof WatermarkAlignmentEvent) {
updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);
checkWatermarkAlignment();
checkSplitWatermarkAlignment();
} else if (event instanceof AddSplitEvent) {
handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));
} else if (event instanceof SourceEventWrapper) {
sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
} else if (event instanceof NoMoreSplitsEvent) {
sourceReader.notifyNoMoreSplits();
} else {
throw new IllegalStateException("Received unexpected operator event " + event);
}
}
如果接收到的事件类型为AddSplitEvent
,调用handleAddSplitsEvent
方法。分析如下:
private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
try {
// 反序列化得到split信息
List<SplitT> newSplits = event.splits(splitSerializer);
numSplits += newSplits.size();
// 如果下游output还没有初始化,加入到pending集合中缓存起来
// 否则创建output,将split分配给这些output
if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
// For splits arrived before the main output is initialized, store them into the
// pending list. Outputs of these splits will be created once the main output is
// ready.
outputPendingSplits.addAll(newSplits);
} else {
// Create output directly for new splits if the main output is already initialized.
createOutputForSplits(newSplits);
}
// 将split添加到sourceReader
sourceReader.addSplits(newSplits);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
}
}
最后我们一路跟踪到SourceReaderBase
的addSplits
方法。
@Override
public void addSplits(List<SplitT> splits) {
LOG.info("Adding split(s) to reader: {}", splits);
// Initialize the state for each split.
splits.forEach(
s ->
splitStates.put(
s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
// Hand over the splits to the split fetcher to start fetch.
splitFetcherManager.addSplits(splits);
}
它把split交给splitFetcherManager
执行。在本篇KafkaSource
环境下它的实现类为KafkaSourceFetcherManager
。它的addSplits
方法位于父类SingleThreadFetcherManager
中。
分析到这里,我们回到了上一节"数据读取流程"的开头"添加分片"方法。至此KafkaSource
分区发现逻辑分析完毕。
Checkpoint逻辑
KafkaSourceReader
的snapshotState
方法返回当前需要checkpoint的分片信息,即Reader分配的分片。如果用户配置了commit.offsets.on.checkpoint=true
,保存各个分片对应的分区和offset分区到offsetsToCommit中。
@Override
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
// 获取分配给当前Reader的分片(checkpointId参数实际上没有用到)
List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
// 由配置项commit.offsets.on.checkpoint决定
// 是否在checkpoint的时候,提交offset
if (!commitOffsetsOnCheckpoint) {
return splits;
}
// 下面逻辑只有在开启commit.offsets.on.checkpoint的时候才会执行
// offsetToCommit保存了需要commit的offset信息
// 是一个Map<checkpointID, Map<partition, offset>>数据结构
// 如果当前Reader没有分片,并且也没有读取完毕的分片
// offsetsToCommit记录checkpoint id对应一个空的map
if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
offsetsToCommit.put(checkpointId, Collections.emptyMap());
} else {
// 为当前checkpoint id创建一个offsetMap,保存在offsetsToCommit中
Map<TopicPartition, OffsetAndMetadata> offsetsMap =
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
// Put the offsets of the active splits.
// 遍历splits,保存split对应的分区和offset到offsetsMap中
for (KafkaPartitionSplit split : splits) {
// If the checkpoint is triggered before the partition starting offsets
// is retrieved, do not commit the offsets for those partitions.
if (split.getStartingOffset() >= 0) {
offsetsMap.put(
split.getTopicPartition(),
new OffsetAndMetadata(split.getStartingOffset()));
}
}
// 保存所有完成读取的split的partition和offset信息
// Put offsets of all the finished splits.
offsetsMap.putAll(offsetsOfFinishedSplits);
}
return splits;
}
notifyCheckpointComplete
方法。该方法在checkpoint完毕的时候执行。由SourceCoordinator
发送checkpoint完毕通知。在这个方法中Kafka数据源提交Kafka offset。
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing offsets for checkpoint {}", checkpointId);
// 同上,如果没有启用checkpoint时候提交offset的配置,方法退出,什么也不做
if (!commitOffsetsOnCheckpoint) {
return;
}
// 从offsetsToCommit中获取当前checkpoint需要提交的分区offset信息
Map<TopicPartition, OffsetAndMetadata> committedPartitions =
offsetsToCommit.get(checkpointId);
// 如果为空,退出
if (committedPartitions == null) {
LOG.debug(
"Offsets for checkpoint {} either do not exist or have already been committed.",
checkpointId);
return;
}
// 调用KafkaSourceFetcherManager,提交offset到kafka
// 稍后分析
((KafkaSourceFetcherManager) splitFetcherManager)
.commitOffsets(
committedPartitions,
(ignored, e) -> {
// The offset commit here is needed by the external monitoring. It won't
// break Flink job's correctness if we fail to commit the offset here.
// 这里是提交offset的回调函数
// 如果遇到错误,监控指标记录下失败的提交
if (e != null) {
kafkaSourceReaderMetrics.recordFailedCommit();
LOG.warn(
"Failed to commit consumer offsets for checkpoint {}",
checkpointId,
e);
} else {
LOG.debug(
"Successfully committed offsets for checkpoint {}",
checkpointId);
// 监控指标记录成功的提交
kafkaSourceReaderMetrics.recordSucceededCommit();
// If the finished topic partition has been committed, we remove it
// from the offsets of the finished splits map.
committedPartitions.forEach(
(tp, offset) ->
kafkaSourceReaderMetrics.recordCommittedOffset(
tp, offset.offset()));
// 由于offset已提交,从已完成split集合中移除
offsetsOfFinishedSplits
.entrySet()
.removeIf(
entry ->
committedPartitions.containsKey(
entry.getKey()));
// 移除当前以及之前的checkpoint id对应的offset信息,因为已经commit过,无需再保存
while (!offsetsToCommit.isEmpty()
&& offsetsToCommit.firstKey() <= checkpointId) {
offsetsToCommit.remove(offsetsToCommit.firstKey());
}
}
});
}
接下来我们关注KafkaSourceFetcherManager
。这个类负责向KafkaConsumer
提交offset,逻辑对应commitOffsets
方法,内容如下:
public void commitOffsets(
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback callback) {
LOG.debug("Committing offsets {}", offsetsToCommit);
// 如果没有offset需要commit,返回
if (offsetsToCommit.isEmpty()) {
return;
}
// 获取正在运行的SplitFetcher
SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher =
fetchers.get(0);
if (splitFetcher != null) {
// The fetcher thread is still running. This should be the majority of the cases.
// 如果fetcher仍在运行,创建提交offset的任务,加入队列
enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
} else {
// 如果没有SplitFetcher运行,创建一个新的SplitFetcher
// 和上面异常创建任务之后,启动这个SplitFetcher
splitFetcher = createSplitFetcher();
enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
startFetcher(splitFetcher);
}
}
继续分析创建offset提交任务的方法。代码如下:
private void enqueueOffsetsCommitTask(
SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher,
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
OffsetCommitCallback callback) {
// 获取splitFetcher对应的KafkaReader
KafkaPartitionSplitReader kafkaReader =
(KafkaPartitionSplitReader) splitFetcher.getSplitReader();
为fetcher创建一个SplitFetcherTask
splitFetcher.enqueueTask(
new SplitFetcherTask() {
@Override
public boolean run() throws IOException {
kafkaReader.notifyCheckpointComplete(offsetsToCommit, callback);
return true;
}
@Override
public void wakeUp() {}
});
}
到此,一个SplitFetcherTask
已被添加到SplitFetcher
的taskQueue
中。根据我们在前面"数据读取流程"分析的结论,SplitFetcher
通过runOnce
方法逐个读取taskQueue
中排队的任务执行。当它取出SplitFetcherTask
时,会运行它的run
方法。调用kafkaReader.notifyCheckpointComplete
方法。这个方法调用KafkaConsumer
的异步提交offset方法commitAsync
。
public void notifyCheckpointComplete(
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
OffsetCommitCallback offsetCommitCallback) {
consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
}
到这里,KafkaSource checkpoint提交offset的过程分析完毕。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。