数客联盟

Flink kafka sink源码解析

2019-11-03  本文已影响0人  Woople

初始化

通常添加一个kafka sink的代码如下:

input.addSink(
   new FlinkKafkaProducer<>(
      "bar",
      new KafkaSerializationSchemaImpl(),
         properties,
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");

初始化执行env.addSink的时候会创建StreamSink对象,即StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));这里的sinkFunction就是传入的FlinkKafkaProducer对象,StreamSink构造函数中将这个对象传给父类AbstractUdfStreamOperator的userFunction变量,源码如下:

StreamSink.java

public StreamSink(SinkFunction<IN> sinkFunction) {
  super(sinkFunction);
  chainingStrategy = ChainingStrategy.ALWAYS;
}

AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}

Task运行

StreamSink会调用下面的方法发送数据

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   sinkContext.element = element;
   userFunction.invoke(element.getValue(), sinkContext);
}

也就是实际调用的是FlinkKafkaProducer#invoke方法。在FlinkKafkaProducer的构造函数中需要指FlinkKafkaProducer.Semantic,即:

public enum Semantic {
   EXACTLY_ONCE,
   AT_LEAST_ONCE,
   NONE
}

下面就基于3种语义分别说一下总体的向kafka发送数据的流程

Semantic.NONE

这种方式不会做任何额外的操作,完全依靠kafka producer自身的特性,也就是FlinkKafkaProducer#invoke里面发送数据之后,flink不会再考虑kafka是否已经正确的收到数据。

transaction.producer.send(record, callback);

Semantic.AT_LEAST_ONCE

这种语义下,除了会走上面说到的发送数据的流程外,如果开启了checkpoint功能,在FlinkKafkaProducer#snapshotState中会首先执行父类的snapshotState方法,里面最终会执行FlinkKafkaProducer#preCommit

@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
   switch (semantic) {
      case EXACTLY_ONCE:
      case AT_LEAST_ONCE:
         flush(transaction);
         break;
      case NONE:
         break;
      default:
         throw new UnsupportedOperationException("Not implemented semantic");
   }
   checkErroneous();
}

AT_LEAST_ONCE会执行了flush方法,里面执行了

transaction.producer.flush();

就是将send的数据立即发送给kafka服务端,详细含义可以参考KafkaProducer api

flush()

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

Semantic.EXACTLY_ONCE

EXACTLY_ONCE语义也会执行sendflush方法,但是同时会开启kafka producer的事务机制,详细内容请参考Kafka事务。FlinkKafkaProducer中beginTransaction的源码如下,可以看到只有是EXACTLY_ONCE模式才会真正开始一个事务。

@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
   switch (semantic) {
      case EXACTLY_ONCE:
         FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
         producer.beginTransaction();
         return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
      case AT_LEAST_ONCE:
      case NONE:
         // Do not create new producer on each beginTransaction() if it is not necessary
         final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
         if (currentTransaction != null && currentTransaction.producer != null) {
            return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
         }
         return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
      default:
         throw new UnsupportedOperationException("Not implemented semantic");
   }
}

和AT_LEAST_ONCE另一个不同的地方在于checkpoint的时候,会将事务相关信息保存到变量nextTransactionalIdHintState中,这个变量存储的信息会作为checkpoint中的一部分进行持久化。

if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
   checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
   long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

   // If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
   // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
   // scaling up.
   if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
      nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
   }

   nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
      getRuntimeContext().getNumberOfParallelSubtasks(),
      nextFreeTransactionalId));
}

总结

本文介绍了FlinkKafkaProducer的基本实现原理,后续会详细介绍flink在结合kafka的时候如何做到端到端的Exactly Once语义的。

注:本文基于flink 1.9.0和kafka 2.3

参考

Flink kafka source源码解析

上一篇下一篇

猜你喜欢

热点阅读