Flink 源码之OperatorChain
Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
OperatorChain
是Flink中一个很重要的优化措施,能够将尽可能多的满足条件的数据处理操作在一个slot
中串联完成,从而最小化线程执行上下文切换和网络通信,提高流计算系统性能。
Flink判断哪些操作可以纳入同一个chain的逻辑位于JobGraph
生成过程中,详情请参见:Flink 源码之JobGraph生成。
名词解释
- StreamEdge:为
StreamGraph
拓扑图中的元素。StreamGraph
由StreamNode
和StreamEdge
构成DAG图。详情参见 Flink 源码之StreamGraph生成。 - RecordWriterOutput:一种operator输出(
Output
)类型,用于将数据通过RecordWriter
输出到ResultPartition
。 - ChainingOutput:和
RecordWriterOutput
类似,也是一种operator输出类型,只不过ChainingOutput
是在OperatorChain
中专用的。它作为桥梁,将上游operator处理过得数据交给下游的operator。后面章节有详细分析。 - TypeSerializer:用于从
DataInputView
读取字节数组并反序列化为T类型,或者是将T类型序列化为DataOutputView
。其中DataInputView
和DataOutputView
均直接操纵字节数组,这些字节数组的实际存储由MemorySegment
支撑。 - StreamOperatorWrapper:用于包装
StreamOperator
,OperatorChain
专用。它具有两个指针,分别指向前后两个operator,形成一个双向链表。Chain的概念由此而来。
接下来我们从OperatorChain
的构造方法开始展开分析。
构造方法
OperatorChain
在StreamTask
中beforeInvoke
方法构建出来(参见 Flink 源码之StreamTask)。获取chain到一起的operator(为OperatorChain
中的mainOperator
,如何生成chain到一起的operator的具体过程后面有分析),有数据到来的时候数据便交由mainOperator
来处理。
OperatorChain
的构造函数和分析如下所示:
public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
// 创建发送和接收OperatorEvent的Dispatcher
this.operatorEventDispatcher =
new OperatorEventDispatcherImpl(
containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
// 获取用户代码类加载器
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
// 获取任务的配置
final StreamConfig configuration = containingTask.getConfiguration();
// 获取StreamTask的StreamOperator工厂
StreamOperatorFactory<OUT> operatorFactory =
configuration.getStreamOperatorFactory(userCodeClassloader);
// we read the chained configs, and the order of record writer registrations by output name
// 获取OperatorChain中所有StreamOperator对应的StreamConfig,map的key为vertexID
Map<Integer, StreamConfig> chainedConfigs =
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
// 按照数据流顺序,获取各个任务的StreamEdge
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
// from here on, we need to make sure that the output writers are shut down again on failure
boolean success = false;
try {
// 创建链式输出
// 用于初始化streamOutputMap变量
// streamOutputMap保存了每步操作的StreamEdge和output的对应关系
createChainOutputs(
outEdgesInOrder,
recordWriterDelegate,
chainedConfigs,
containingTask,
streamOutputMap);
// we create the chain of operators and grab the collector that leads into the chain
// 创建包含所有operatorWrapper的集合
List<StreamOperatorWrapper<?, ?>> allOpWrappers =
new ArrayList<>(chainedConfigs.size());
// 创建mainOperator对应的output
// OperatorChain的入口Operator为mainOperator
// 这个operator通过ChainingOutput按照数据流向顺序串联了OperatorChain中的所有operator
this.mainOperatorOutput =
createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOpWrappers,
containingTask.getMailboxExecutorFactory());
if (operatorFactory != null) {
// 创建mainOperator和时间服务
Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
mainOperatorOutput,
operatorEventDispatcher);
OP mainOperator = mainOperatorAndTimeService.f0;
// 设置Watermark监控项
mainOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
mainOperatorOutput.getWatermarkGauge());
// 创建mainOperatorWrapper
this.mainOperatorWrapper =
createOperatorWrapper(
mainOperator,
containingTask,
configuration,
mainOperatorAndTimeService.f1,
true);
// add main operator to end of chain
// 将mainOperatorWrapper添加到chain的最后
allOpWrappers.add(mainOperatorWrapper);
// createOutputCollector方法将各个operator包装到operatorWrapper中
// 按照数据流相反的顺序加入到allOpWrappers集合
// 所以,尾部的operatorWrapper就是index为0的元素
this.tailOperatorWrapper = allOpWrappers.get(0);
} else {
// 如果OperatorFactory为null
checkState(allOpWrappers.size() == 0);
this.mainOperatorWrapper = null;
this.tailOperatorWrapper = null;
}
// 创建chain数据源
this.chainedSources =
createChainedSources(
containingTask,
configuration.getInputs(userCodeClassloader),
chainedConfigs,
userCodeClassloader,
allOpWrappers);
this.numOperators = allOpWrappers.size();
// 将所有的StreamOperatorWrapper按照从上游到下游的顺序,形成双向链表
firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);
success = true;
} finally {
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
}
}
}
}
}
createChainOutputs
createChainOutputs
方法作用为生成并保存每个StreamEdge
和streamOutput
的对应关系。代码如下所示:
private void createChainOutputs(
List<StreamEdge> outEdgesInOrder,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
Map<Integer, StreamConfig> chainedConfigs,
StreamTask<OUT, OP> containingTask,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
// 遍历已排序的StreamEdge
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
// 创建streamOutput
RecordWriterOutput<?> streamOutput =
createStreamOutput(
recordWriterDelegate.getRecordWriter(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
// 更新streamOutput数组
this.streamOutputs[i] = streamOutput;
// 保存每个StreamEdge和streamOutput的对应关系
streamOutputMap.put(outEdge, streamOutput);
}
}
接着继续分析createStreamOutput
方法:
private RecordWriterOutput<OUT> createStreamOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
StreamEdge edge,
StreamConfig upStreamConfig,
Environment taskEnvironment) {
// 获取Output标签,如果没有配置旁路输出,没有OutputTag
OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer = null;
// 根据是否为旁路输出,获取对应的类型序列化器
if (edge.getOutputTag() != null) {
// side output
outSerializer =
upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(),
taskEnvironment.getUserCodeClassLoader().asClassLoader());
} else {
// main output
outSerializer =
upStreamConfig.getTypeSerializerOut(
taskEnvironment.getUserCodeClassLoader().asClassLoader());
}
// 返回创建的RecordWriterOutput
return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}
createOutputCollector
这个方法是chain的主要逻辑所在。我们重点分析。createOutputCollector
方法分析如下:
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
new ArrayList<>(4);
// create collectors for the network outputs
// 遍历非链式StreamEdge,非链式的StreamEdge输出需要走网络连接
// 因此生成的Output类型为RecordWriterOutput
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
// 从上一步createChainOutputs方法返回的streamOutputs中获取StreamEdge对应的output
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
// 加入到allOutputs集合中
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// Create collectors for the chained outputs
// 获取该Operator对应的所有chained StreamEdge
// 如果这个Operator具有多个chained的下游,这里会获取到多个outEdge
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
// 获取这个outputEdge对应的StreamConfig
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
// 根据StreamEdge生成streamOutput,为WatermarkGaugeExposingOutput类型
// WatermarkGaugeExposingOutput包装了Output和一个监控watermark的仪表盘
// 如果存在可以chain的operator,需要递归调用,将下游与上游链接起来
WatermarkGaugeExposingOutput<StreamRecord<T>> output =
createOperatorChain(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory);
// 将其加入allOutputs集合中
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 如果输出只有一个,返回这个输出
if (allOutputs.size() == 1) {
return allOutputs.get(0).f0;
} else {
// send to N outputs. Note that this includes the special case
// of sending to zero outputs
// 如果有多个输出,将allOutputs转换为Output类型数组
@SuppressWarnings({"unchecked"})
Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++) {
asArray[i] = allOutputs.get(i).f0;
}
// This is the inverse of creating the normal ChainingOutput.
// If the chaining output does not copy we need to copy in the broadcast output,
// otherwise multi-chaining would not work correctly.
// 根据配置中对象是否可重用,创建不同的OutputCollector
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
// 在StreamRecord发往下游的时候实际发送的是StreamRecord的浅拷贝
// 避免使用深拷贝,从而提高性能,但是需要注意如果开启ObjectReuse
// 避免在下游改变流数据元素的值,否则会出现线程安全问题
return new CopyingBroadcastingOutputCollector<>(asArray, this);
} else {
return new BroadcastingOutputCollector<>(asArray, this);
}
}
}
然后需要分析createOperatorChain
方法。它将OperatorChain
中所有的Operator包装为StreamOperatorWrapper
类型,按照数据流反方向存入allOperatorWrappers
集合。根据operator的顺序,依次生成ChainingOutput
,将各个operator数据流串联起来。该方法内容如下:
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(
StreamTask<OUT, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
OutputTag<IN> outputTag,
MailboxExecutorFactory mailboxExecutorFactory) {
// create the output that the operator writes to first. this may recursively create more
// operators
// 这里的operatorConfig为前一个方法中每次遍历的chainedOpConfig
// 这里存在一个递归调用,将下游outEdge对应的StreamConfig作为参数,再次调用createOutputCollector
// 最终的效果为上游operator的output指向下游operator,实现了chain,即链式调用
// 最先返回的是最下游的output
// operator的output按照从下游到上游的顺序,依次被包装为WatermarkGaugeExposingOutput
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperatorWrappers,
mailboxExecutorFactory);
// 创建链式operator
// 参数中使用上一步生成的operator output
OneInputStreamOperator<IN, OUT> chainedOperator =
createOperator(
containingTask,
operatorConfig,
userCodeClassloader,
chainedOperatorOutput,
allOperatorWrappers,
false);
// 将operator包装到output中并返回,后面分析
return wrapOperatorIntoOutput(
chainedOperator, containingTask, operatorConfig, userCodeClassloader, outputTag);
}
createOperator
方法根据operatorConfig
创建出StreamOperator
,然后使用StreamOperatorWrapper
包装:
private <OUT, OP extends StreamOperator<OUT>> OP createOperator(
StreamTask<OUT, ?> containingTask,
StreamConfig operatorConfig,
ClassLoader userCodeClassloader,
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
boolean isHead) {
// now create the operator and give it the output collector to write its output to
// 使用StreamOperatorFactory创建出一个StreamOperator,使用指定的output
// 这个方法比较复杂,这里不再介绍
Tuple2<OP, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorConfig.getStreamOperatorFactory(userCodeClassloader),
containingTask,
operatorConfig,
output,
operatorEventDispatcher);
// 获取创建的operator
OP chainedOperator = chainedOperatorAndTimeService.f0;
// 使用StreamOperatorWrapper包装此新创建的operator
// StreamOperatorWrapper是operator在chaining时执行专用的封装类型,后面分析
// 由于是递归调用,最先执行到这里的是最下游的算子
// 因此allOperatorWrappers保存的顺序实际上是operator按照数据流向反向排列
allOperatorWrappers.add(
createOperatorWrapper(
chainedOperator,
containingTask,
operatorConfig,
chainedOperatorAndTimeService.f1,
isHead));
// 添加一个watermark监控用仪表盘
chainedOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
output.getWatermarkGauge()::getValue);
return chainedOperator;
}
这里我们重点说下createOperatorWrapper
。该方法使用StreamOperatorWrapper
将StreamOperator
包装起来。大家可能会问为什么这里需要使用StreamOperatorWrapper
。我们看下StreamOperatorWrapper
中的部分属性和方法。
private StreamOperatorWrapper<?, ?> previous;
private StreamOperatorWrapper<?, ?> next;
// 中间省略...
public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
throws Exception {
if (!isHead && !isStoppingBySyncSavepoint) {
// NOTE: This only do for the case where the operator is one-input operator. At present,
// any non-head operator on the operator chain is one-input operator.
actionExecutor.runThrowing(() -> endOperatorInput(1));
}
quiesceTimeServiceAndCloseOperator(actionExecutor);
// propagate the close operation to the next wrapper
if (next != null) {
next.close(actionExecutor, isStoppingBySyncSavepoint);
}
}
我们不难发现StreamOperatorWrapper
具有previous
和next
两个指针,形成了一个双向链表。OperatorChain
中的所有的operator保存在这种双向链表结构中,从而实现了chain的语义,即OperatorChain
中的operator按照顺序依次执行。
createOperatorWrapper
方法仅仅是使用StreamOperatorWrapper
包装了StreamOperator
,并没有生成双向队列。构建双向队列的方法为linkOperatorWrappers
,后面我们分析。
除此以外createOperatorWrapper
还具有自己的关闭逻辑。如close
方法所示,它除了关闭当前operator外,还会递归关闭队列后面所有的operator。
最后我们分析。wrapOperatorIntoOutput
方法。它将operator
包装到Output中。output的类型为ChainingOutput
。
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput(
OneInputStreamOperator<IN, OUT> operator,
StreamTask<OUT, ?> containingTask,
StreamConfig operatorConfig,
ClassLoader userCodeClassloader,
OutputTag<IN> outputTag) {
WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
// 如果开启了对象重用,创建ChainingOutput
// 具体ChainingOutput相关内容在接下来章节分析
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(operator, this, outputTag);
} else {
// 否则创建CopyingChainingOutput
// 传递StreamRecord时会进行深拷贝
TypeSerializer<IN> inSerializer =
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput =
new CopyingChainingOutput<>(operator, inSerializer, outputTag, this);
}
// wrap watermark gauges since registered metrics must be unique
// 创建一个watermark监控仪表
operator.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_INPUT_WATERMARK,
currentOperatorOutput.getWatermarkGauge()::getValue);
return currentOperatorOutput;
}
ChainingOutput
ChainingOutput
实现了把上游operator的输出作为下一个operator的输入。创建ChainingOutput
时需要传入下游operator
,保存到input
属性中。它的构造方法如下所示:
protected final Input<T> input;
public ChainingOutput(
OneInputStreamOperator<T, ?> operator,
StreamStatusProvider streamStatusProvider,
@Nullable OutputTag<T> outputTag) {
this(
operator,
(OperatorMetricGroup) operator.getMetricGroup(),
streamStatusProvider,
outputTag,
operator::close);
}
public ChainingOutput(
Input<T> input,
OperatorMetricGroup operatorMetricGroup,
StreamStatusProvider streamStatusProvider,
@Nullable OutputTag<T> outputTag,
@Nullable AutoCloseable closeable) {
this.input = input;
this.closeable = closeable;
{
Counter tmpNumRecordsIn;
try {
OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup();
tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsIn = new SimpleCounter();
}
numRecordsIn = tmpNumRecordsIn;
}
this.streamStatusProvider = streamStatusProvider;
this.outputTag = outputTag;
}
为了证实下ChainingOutput
的确会把数据输出到下游operator,我们查看collect
方法:
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are not responsible for emitting to the main output.
return;
}
pushToOperator(record);
}
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
input.setKeyContextElement(castRecord);
input.processElement(castRecord);
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
collect
方法调用了pushToOperator
方法。其中执行了input.processElement(castRecord)
,从而把数据传递给了下一个operator。
ChainingOutput
还有一个子类叫做CopyingChainingOutput
。它重写了pushToOperator
方法,在数据发送往下游operator之前会创建一个深拷贝。如果启用了Object重用(containingTask.getExecutionConfig().isObjectReuseEnabled()
返回true),使用ChainingOutput
,否则使用CopyingChainingOutput
。
@Override
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator (and Serializer) expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
// 这里创建出一个深拷贝,再发往下游
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
input.setKeyContextElement(copy);
input.processElement(copy);
} catch (ClassCastException e) {
// ...
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
createChainedSources
该方法用于创建chained数据源。
@SuppressWarnings("rawtypes")
private Map<SourceInputConfig, ChainedSource> createChainedSources(
StreamTask<OUT, OP> containingTask,
InputConfig[] configuredInputs,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
// 如果所有的configuredInputs都不是SourceInputConfig类型,返回空map
if (Arrays.stream(configuredInputs)
.noneMatch(input -> input instanceof SourceInputConfig)) {
return Collections.emptyMap();
}
// chained 数据源只适用于多个输入的StreamOperator
checkState(
mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator,
"Creating chained input is only supported with MultipleInputStreamOperator and MultipleInputStreamTask");
Map<SourceInputConfig, ChainedSource> chainedSourceInputs = new HashMap<>();
MultipleInputStreamOperator<?> multipleInputOperator =
(MultipleInputStreamOperator<?>) mainOperatorWrapper.getStreamOperator();
// 获取它所有的Input
List<Input> operatorInputs = multipleInputOperator.getInputs();
// 计算InputGate的Index,为所有InputGate的index最大值加1
int sourceInputGateIndex =
Arrays.stream(containingTask.getEnvironment().getAllInputGates())
.mapToInt(IndexedInputGate::getInputGateIndex)
.max()
.orElse(-1)
+ 1;
// 遍历每个Input
for (int inputId = 0; inputId < configuredInputs.length; inputId++) {
// 排除掉所有不是SourceInputConfig类型的情况
if (!(configuredInputs[inputId] instanceof SourceInputConfig)) {
continue;
}
SourceInputConfig sourceInput = (SourceInputConfig) configuredInputs[inputId];
int sourceEdgeId = sourceInput.getInputEdge().getSourceId();
// 根据input edge获取sourceInputConfig
StreamConfig sourceInputConfig = chainedConfigs.get(sourceEdgeId);
OutputTag outputTag = sourceInput.getInputEdge().getOutputTag();
// 创建链式的数据源output
// 目前只支持Object Reuse开启
// 实际返回的类型为ChainingOutput
WatermarkGaugeExposingOutput chainedSourceOutput =
createChainedSourceOutput(
containingTask,
operatorInputs.get(inputId),
(OperatorMetricGroup) multipleInputOperator.getMetricGroup(),
outputTag);
// 创建数据源operator
// createOperator前面分析过,不再赘述
SourceOperator<?, ?> sourceOperator =
(SourceOperator<?, ?>)
createOperator(
containingTask,
sourceInputConfig,
userCodeClassloader,
(WatermarkGaugeExposingOutput<StreamRecord<OUT>>)
chainedSourceOutput,
allOpWrappers,
true);
// 放入chainedSourceInputs中
chainedSourceInputs.put(
sourceInput,
new ChainedSource(
chainedSourceOutput,
new StreamTaskSourceInput<>(
sourceOperator, sourceInputGateIndex++, inputId)));
}
return chainedSourceInputs;
}
linkOperatorWrappers
linkOperatorWrappers
方法将chain中的operator按照逐个连接起来。注意,由于上一步createOutputCollector
方法构造的allOperatorWrappers
存放的各个operator顺序为从下游到上游,因此linkOperatorWrappers
方法需要将这个连接顺序颠倒过来。
private StreamOperatorWrapper<?, ?> linkOperatorWrappers(
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers) {
// 暂存前一个处理的operator
StreamOperatorWrapper<?, ?> previous = null;
// 遍历所有的operator
for (StreamOperatorWrapper<?, ?> current : allOperatorWrappers) {
if (previous != null) {
// 设置previous的前一个operator为当前operator
previous.setPrevious(current);
}
// 设置当前operator的下一个operator为previous
current.setNext(previous);
// 设置当前operator为previous
previous = current;
}
return previous;
}
initializeStateAndOpenOperators
在StreamTask
开始接收数据之前,需要初始化各个operator的状态(state)和开启operator(调用各个operator的open
方法)。initializeStateAndOpenOperators
正是用来完成这个工作的。
protected void initializeStateAndOpenOperators(
StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();
operator.initializeState(streamTaskStateInitializer);
operator.open();
}
}
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。