数客联盟

Flink kafka source源码解析(一)

2019-11-01  本文已影响0人  Woople

主要流程

一般在flink中创建kafka source的代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//KafkaEventSchema为自定义的数据字段解析类
env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)

而Kafka的KafkaConsumer API中消费某个topic使用的是poll方法如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));

本文将介绍flink从env.addSource方法最终调用到consumer.poll方法的过程。

源码分析

初始化

初始化执行env.addSource的时候会创建StreamSource对象,即final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);这里的function就是传入的FlinkKafkaConsumer对象,StreamSource构造函数中将这个对象传给父类AbstractUdfStreamOperator的userFunction变量,源码如下:

StreamSource.java

public StreamSource(SRC sourceFunction) {
    super(sourceFunction);
    this.chainingStrategy = ChainingStrategy.HEAD;
}

AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}

Task运行

task启动后会调用到SourceStreamTask中的performDefaultAction()方法,这里面会启动一个线程sourceThread.start();,部分源码如下

private final LegacySourceFunctionThread sourceThread;

@Override
protected void performDefaultAction(ActionContext context) throws Exception {
    sourceThread.start();
}

LegacySourceFunctionThread的run方法中,通过调用headOperator.run方法,最终调用了StreamSource中的run方法,部分源码如下:

public void run(final Object lockingObject,
                final StreamStatusMaintainer streamStatusMaintainer,
                final Output<StreamRecord<OUT>> collector,
                final OperatorChain<?, ?> operatorChain) throws Exception {

  //省略部分代码
  this.ctx = StreamSourceContexts.getSourceContext(
    timeCharacteristic,
    getProcessingTimeService(),
    lockingObject,
    streamStatusMaintainer,
    collector,
    watermarkInterval,
    -1);

  try {
    userFunction.run(ctx);
    //省略部分代码
  } finally {
    // make sure that the context is closed in any case
    ctx.close();
    if (latencyEmitter != null) {
      latencyEmitter.close();
    }
  }
}

这里最重要的就是userFunction.run(ctx);,这个userFunction就是在上面初始化的时候传入的FlinkKafkaConsumer对象,也就是说这里实际调用了FlinkKafkaConsumer中的run方法,而具体的方法实现在其父类FlinkKafkaConsumerBase中,至此,进入了真正的kafka消费阶段。

Kafka消费阶段

FlinkKafkaConsumerBase#run中创建了一个KafkaFetcher对象,并最终调用了kafkaFetcher.runFetchLoop(),这个方法的代码片段如下:

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;

@Override
public void runFetchLoop() throws Exception {
  try {
    final Handover handover = this.handover;

    // kick off the actual Kafka consumer
    consumerThread.start();
    
    //省略部分代码
}

可以看到实际启动了一个KafkaConsumerThread线程。进入到KafkaConsumerThread#run中,下面只是列出了这个方法的部分源码,完整代码请参考KafkaConsumerThread.java

@Override
public void run() {
  // early exit check
  if (!running) {
    return;
  }
  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    this.consumer = getConsumer(kafkaProperties);
  } catch (Throwable t) {
    handover.reportError(t);
    return;
  }
  try {

    // main fetch loop
    while (running) {
      try {
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          } catch (WakeupException we) {
            continue;
          }
        }
      }
      // end main fetch loop
    }
  } catch (Throwable t) {
    handover.reportError(t);
  } finally {
    handover.close();
    try {
      consumer.close();
    } catch (Throwable t) {
      log.warn("Error while closing Kafka consumer", t);
    }
  }
}

至此,终于走到了真正从kafka拿数据的代码,即records = consumer.poll(pollTimeout);。因为KafkaConsumer不是线程安全的,所以每个线程都需要生成独立的KafkaConsumer对象,即this.consumer = getConsumer(kafkaProperties);

KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
  return new KafkaConsumer<>(kafkaProperties);
}

总结

本文只是介绍了flink消费kafka数据的关键流程,后续会更详细的介绍在AT_LEAST_ONCEEXACTLY_ONCE不同场景下FlinkKafkaConsumer管理offset的流程。

注:本文基于flink 1.9.0和kafka 2.3

上一篇下一篇

猜你喜欢

热点阅读