Flink kafka sink源码解析
初始化
通常添加一个kafka sink的代码如下:
input.addSink(
new FlinkKafkaProducer<>(
"bar",
new KafkaSerializationSchemaImpl(),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");
初始化执行env.addSink
的时候会创建StreamSink对象,即StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
这里的sinkFunction就是传入的FlinkKafkaProducer
对象,StreamSink构造函数中将这个对象传给父类AbstractUdfStreamOperator的userFunction
变量,源码如下:
StreamSink.java
public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
}
AbstractUdfStreamOperator.java
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
Task运行
StreamSink会调用下面的方法发送数据
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
sinkContext.element = element;
userFunction.invoke(element.getValue(), sinkContext);
}
也就是实际调用的是FlinkKafkaProducer#invoke
方法。在FlinkKafkaProducer
的构造函数中需要指FlinkKafkaProducer.Semantic
,即:
public enum Semantic {
EXACTLY_ONCE,
AT_LEAST_ONCE,
NONE
}
下面就基于3种语义分别说一下总体的向kafka发送数据的流程
Semantic.NONE
这种方式不会做任何额外的操作,完全依靠kafka producer自身的特性,也就是FlinkKafkaProducer#invoke
里面发送数据之后,flink不会再考虑kafka是否已经正确的收到数据。
transaction.producer.send(record, callback);
Semantic.AT_LEAST_ONCE
这种语义下,除了会走上面说到的发送数据的流程外,如果开启了checkpoint功能,在FlinkKafkaProducer#snapshotState
中会首先执行父类的snapshotState
方法,里面最终会执行FlinkKafkaProducer#preCommit
@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
checkErroneous();
}
AT_LEAST_ONCE会执行了flush方法,里面执行了
transaction.producer.flush();
就是将send的数据立即发送给kafka服务端,详细含义可以参考KafkaProducer api
flush()
Invoking this method makes all buffered records immediately available to send (even if
linger.ms
is greater than 0) and blocks on the completion of the requests associated with these records.
Semantic.EXACTLY_ONCE
EXACTLY_ONCE语义也会执行send
和flush
方法,但是同时会开启kafka producer的事务机制,详细内容请参考Kafka事务。FlinkKafkaProducer中beginTransaction
的源码如下,可以看到只有是EXACTLY_ONCE模式才会真正开始一个事务。
@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
}
return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
和AT_LEAST_ONCE另一个不同的地方在于checkpoint的时候,会将事务相关信息保存到变量nextTransactionalIdHintState
中,这个变量存储的信息会作为checkpoint中的一部分进行持久化。
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
// scaling up.
if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
}
nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
}
总结
本文介绍了FlinkKafkaProducer的基本实现原理,后续会详细介绍flink在结合kafka的时候如何做到端到端的Exactly Once语义的。
注:本文基于flink 1.9.0和kafka 2.3