Flink kafka source源码解析(三)
checkpoint模式下offset的提交
在Flink kafka source源码解析(二)介绍了在没有开启checkpoint的时候,offset的提交方式,本文将重点介绍开启checkpoint后,flink kafka consumer提交offset的方式。
初始化offsetCommitMode
通过Flink kafka source源码解析(二)的介绍可以知道,当调用了env.enableCheckpointing
方法后offsetCommitMode
的值就是ON_CHECKPOINTS
,而且会通过下面方法强制关闭kafka自动提交功能,这个值很重要,后续很多地方都是根据这个值去判断如何操作的。
/**
* Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
* This overwrites whatever setting the user configured in the properties.
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
保存offset
在做checkpoint的时候会调用FlinkKafkaConsumerBase#snapshotState
方法,其中pendingOffsetsToCommit
会保存要提交的offset
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
同时,下面的变量会作为checkpoint的一部分保存下来,以便恢复时使用。
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
在snapshotState
方法中会同时保存offset
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
提交offset
在checkpoint完成以后,task会调用notifyCheckpointComplete
方法,里面判断offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS
的时候,调用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
方法,最终会将要提交的offset通过KafkaFetcher#doCommitInternalOffsetsToKafka
方法中的consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
保存到KafkaConsumerThread.java
中的nextOffsetsToCommit
成员变量里面。
这样就会保证当有需要提交的offset的时候,下面代码会执行consumer.commitAsync
,从而完成了手动提交offset到kafka。
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
commitInProgress = true;
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
总结
本文介绍了在checkpoint模式下,flink kafka source提交offset的方式,后续会介绍consumer读取offset的流程。
注:本文基于flink 1.9.0和kafka 2.3