一文搞懂 Flink OperatorChain 对象重用
2022-06-21 本文已影响0人
shengjk1
OperatorChain 的对象重用,可以提高效率,但什么情况下可以重用,什么情况下不可以重用,我们一起来看你一下代码:
首先,在OperatorChain 类的 createChainedOperator 方法
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag) {
...
//chainingoutput
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
// deep copy
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
...
}
如果启用了对象重用,即 isObjectReuseEnabled==true,创建的 outPut 为 ChainingOutput,如果没有启用对象重用,则 outPut 为 CopyingChainingOutput。
需要明确的是 currentOperatorOutput 是为给下游算子输入数据的。而 ChainingOutput 和 CopyingChainingOutput 的区别是 ChainingOutput 是值传递,而 CopyingChainingOutput 是深拷贝。看到这里我们应该就已经明确了什么情况下可以启动对象重用什么情况下不可以启用对象重用。
我们需要明确的一个点对应 java bean 来说,在启动对象重用情况下,如果下游的算子更改了某个属性值,会直接影响上游,以及其下游,这点还是要特别注意的