数客联盟

Flink kafka source源码解析(三)

2019-11-01  本文已影响0人  Woople

checkpoint模式下offset的提交

Flink kafka source源码解析(二)介绍了在没有开启checkpoint的时候,offset的提交方式,本文将重点介绍开启checkpoint后,flink kafka consumer提交offset的方式。

初始化offsetCommitMode

通过Flink kafka source源码解析(二)的介绍可以知道,当调用了env.enableCheckpointing方法后offsetCommitMode的值就是ON_CHECKPOINTS,而且会通过下面方法强制关闭kafka自动提交功能,这个值很重要,后续很多地方都是根据这个值去判断如何操作的。

/**
 * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
 * This overwrites whatever setting the user configured in the properties.
 * @param properties - Kafka configuration properties to be adjusted
 * @param offsetCommitMode offset commit mode
 */
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
   }
}

保存offset

在做checkpoint的时候会调用FlinkKafkaConsumerBase#snapshotState方法,其中pendingOffsetsToCommit会保存要提交的offset

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
   // the map cannot be asynchronously updated, because only one checkpoint call can happen
   // on this function at a time: either snapshotState() or notifyCheckpointComplete()
   pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}

同时,下面的变量会作为checkpoint的一部分保存下来,以便恢复时使用。

/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

snapshotState方法中会同时保存offset

for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}

提交offset

在checkpoint完成以后,task会调用notifyCheckpointComplete方法,里面判断offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS的时候,调用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);方法,最终会将要提交的offset通过KafkaFetcher#doCommitInternalOffsetsToKafka方法中的consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);保存到KafkaConsumerThread.java中的nextOffsetsToCommit成员变量里面。

这样就会保证当有需要提交的offset的时候,下面代码会执行consumer.commitAsync,从而完成了手动提交offset到kafka。

final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);

if (commitOffsetsAndCallback != null) {
  log.debug("Sending async offset commit request to Kafka broker");

  // also record that a commit is already in progress
  // the order here matters! first set the flag, then send the commit command.
  commitInProgress = true;
  consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}

总结

本文介绍了在checkpoint模式下,flink kafka source提交offset的方式,后续会介绍consumer读取offset的流程。

注:本文基于flink 1.9.0和kafka 2.3

参考

Flink kafka source源码解析(一)
Flink kafka source源码解析(二)

上一篇下一篇

猜你喜欢

热点阅读