flink Task && OperatorChain

2020-05-14  本文已影响0人  邵红晓

Task 是 Flink 任务调度的最小单位。通过 StreamTask -> StreamOperator -> User-define-function 这样的封装,用户自定义的数据处理逻辑最终得以调度执行。

  1. Task#反射AbstractInvokable(StreamTask):(OneInputStreamTask,SourceStreamTask ),通过StreamInputProcessor#processInput#emitNext while true 循环从channel取数据#emitRecord#(OneInputStreamOperator)operator.processElement(record),最终调用到了用户udf函数
  2. OperatorChain#StreamOperator<?>[] allOperators : AbstractStreamOperator ,AbstractUdfStreamOperator,OneInputStreamOperator,StreamMap,层级分装从上往下(用户udf

Task 和 OperatorChain

我们已经了解了 Flink 会尽可能把能够 chaining 到一起的算子串联在一起,形成 OperatorChain,对应一个 JobVertex。

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
        StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
        return downStreamVertex.getInEdges().size() == 1
                && outOperator != null
                && headOperator != null
                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
                && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                    headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
                && (edge.getPartitioner() instanceof ForwardPartitioner)
//必须是stream模式
                && edge.getShuffleMode() != ShuffleMode.BATCH
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled();
    }

自定义operator,别忘了设置operator.setChainingStrategy(ChainingStrategy.ALWAYS),通过flink ui 会发现可能不会chain到一起

参考
https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#%E5%90%AF%E5%8A%A8-task-%E7%BA%BF%E7%A8%8B

上一篇下一篇

猜你喜欢

热点阅读