Java学习笔记大数据程序员

Flink 源码分析1:如何生成 StreamGraph

2018-04-21  本文已影响27人  maskwang520
1. 什么时候生成StreamGraph

给出如下的flink的总体架构图,有个总体的认识,我们可以清楚的看到,在用户给出StreamApi之后,就会转化成StreamGraph,而在它的下面,它会转化成JobGraph。在后续的文章中,会逐层进行分析。


image.png
2. 由简单的demo分析这个过程
public class FlinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.socketTextStream("localhost", 9900);
        // 对一个键型流(keyed stream) 使用过程函数
        text.flatMap(new LineSplitter()).shuffle().filter(new HelloFilter()).print();

        env.execute("FlinkTest");

    }

    public static class LineSplitter implements FlatMapFunction<String, String> {
        public void flatMap(String value, Collector<String> out) throws Exception {
            String[] values = value.split(" ");
            for (String s : values) {
                out.collect(s);
            }
        }
    }

    public static class HelloFilter implements FilterFunction<String> {

        public boolean filter(String value) throws Exception {
            if (value.equals("hello")) {
                return false;
            }
            return true;
        }
    }


}

上面是个很简单的入门例子,主要是实现把输入的字符串按空格分隔成一个一个单个的字符串。并且过滤掉“hello"这个字符串。在后面的分析中,会以这个作为例子

3. 源码分析

env.execute("FlinkTest");是生成StreamGraph的入口。

public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
        return new StreamGraphGenerator(env).generateInternal(transformations);
    }
image.png
从上图我们看出有10种StreamTransformation。DataStream 上常见的 transformation 有 map、flatmap、filter等。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。以生成SingleOutputStreamOperator为例。

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                getType(), Utils.getCallLocationName(), true);
      //包装成StreamFlatMap
        return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));

    }

public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operator,
                outTypeInfo,
                environment.getParallelism());
              //包装成SingleOutputStreamOperator
        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

上面的这个过程,可以抽象成下面一个图

image.png
首先把MapFunction包装成StreamFlatMap,然后包装成OneInputTransformation
另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition等。如下图所示的转换树,在运行时会优化成下方的操作图。
image.png
通过源码也可以发现,UnionTransformation,SplitTransformation,SelectTransformation,PartitionTransformation由于不包含具体的操作所以都没有StreamOperator成员变量,而其他StreamTransformation的子类基本上都有。
StreamOperator
DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。下图所示为 StreamOperator 的类图(点击查看大图):
image.png
可以发现,所有实现类都继承了AbstractStreamOperator。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator,该类是封装了UDF的StreamOperator。UDF就是实现了Function接口的类,如MapFunction,FilterFunction
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            transform(transformation);
        }
        return streamGraph;
    }
private Collection<Integer> transform(StreamTransformation<?> transform) {
               //判断是否已经转化,每次转化完,都会把StreamTransformation加入进来,同时做递归的出口,然后递归的时候,很多时候从这里作为出口
        if (alreadyTransformed.containsKey(transform)) {
            //以前没有转化,则转化
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);
            设置StreamTransformation的MaxParallelism
        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
              //判断是哪种StreamTransformation,进行响应类型的转化
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() > 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
              //存储这个OneInputTransformation的上游Transformation的id,方便构造边,在这里递归,确保所有的上游Transformation都已经转化
        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
                //加入StreamNode
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getOperator(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
              //构造边
        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

结合我们这个例子,对以上内容分析如下。
把UDF转化成Transformation,可以生成3个Transformatoin如下.

OneInputTransformation{id=2, name='Flat Map', outputType=String, parallelism=4}
OneInputTransformation{id=4, name='Filter', outputType=String, parallelism=4}
SinkTransformation{id=5, name='Unnamed', outputType=GenericType<java.lang.Object>, parallelism=4}

由于是树之间父子关系,我们从底层的SinkTransformation看,如下图。

image.png

最后得到的StreamGraph如下

image.png image.png image.png image.png

在这过程中生成一个虚拟结点。


image.png image.png

上面这个图对应步骤如下:

  1. 首先处理的Source,生成了Source的StreamNode。

  2. 然后处理的FlatMap,生成了FlatMap的StreamNode,并生成StreamEdge连接上游Source和FlatMap。由于上下游的并发度不一样(1:4),所以此处是Rebalance分区。

  3. 然后处理的Shuffle,由于是逻辑转换,并不会生成实际的节点。将partitioner信息暂存在virtuaPartitionNodes中。

  4. 在处理Filter时,生成了Filter的StreamNode。发现上游是shuffle,找到shuffle的上游FlatMap,创建StreamEdge与Filter相连。并把ShufflePartitioner的信息写到StreamEdge中。

  5. 最后处理Sink,创建Sink的StreamNode,并生成StreamEdge与上游Filter相连。由于上下游并发度一样(4:4),所以此处选择 Forward 分区。

参考文章:
Flink 原理与实现:如何生成 StreamGraph (本文很多图片参考这篇文章,在此声明)

上一篇 下一篇

猜你喜欢

热点阅读