数客联盟

Flink kafka source源码解析(二)

2019-11-01  本文已影响0人  Woople

offset提交模式(非checkpoint)

消费kafka topic最为重要的部分就是对offset的管理,对于kafka提交offset的机制,可以参考kafka官方网

而在flink kafka source中offset的提交模式有3种:

public enum OffsetCommitMode {

   /** Completely disable offset committing. */
   DISABLED,

   /** Commit offsets back to Kafka only when checkpoints are completed. */
   ON_CHECKPOINTS,

   /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
   KAFKA_PERIODIC;
}

初始化offsetCommitMode

FlinkKafkaConsumerBase#open方法中初始化offsetCommitMode

// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
        ((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());

通过下面的代码返回真正的提交模式

/**
 * Determine the offset commit mode using several configuration values.
 *
 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
 *
 * @return the offset commit mode to use, based on the configuration values.
 */
public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

   if (enableCheckpointing) {
      // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
      return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
   } else {
      // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
      return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
   }
}

本文暂时不考虑checkpoint的场景,所以只考虑(enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;

也就是如果客户端设置了enable.auto.commit=true那么就是KAFKA_PERIODIC,否则就是DISABLED

offset的提交

自动提交

这种方式完全依靠kafka自身的特性进行提交,如下方式指定参数即可

Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)

非自动提交

通过上面的分析,如果enable.auto.commit=false,那么offsetCommitMode就是DISABLED
kafka官方文档中,提到当enable.auto.commit=false时候需要手动提交offset,也就是需要调用consumer.commitSync();方法提交。
但是在flink中,非checkpoint模式下,不会调用consumer.commitSync();,一旦关闭自动提交,意味着kafka不知道当前的consumer group每次消费到了哪。
可以从两方面证实这个问题:

总结

本文介绍了在非checkpoint模式下,flink kafka source提交offset的方式,后续会重点介绍checkpoint模式下提交offset的流程。

注:本文基于flink 1.9.0和kafka 2.3

参考

Flink kafka source源码解析(一)

上一篇 下一篇

猜你喜欢

热点阅读