Flink源码解析

一文搞懂 Flink Stream Join原理

2020-11-30  本文已影响0人  shengjk1

总括

在这里插入图片描述

详解

一般情况下,我们会写如下的代码

DataStreamSource<Tuple2<Long, Long>> addSource = env.addSource(new WordSource());
        
        addSource.join(addSource).where(new KeySelector<Tuple2<Long, Long>, Long>() {
            @Override
            public Long getKey(Tuple2<Long, Long> value) throws Exception {
//              System.out.println("where "+value.f0);
                return value.f0;
            }
        }).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {
            @Override
            public Long getKey(Tuple2<Long, Long> value) throws Exception {
                System.out.println("equalTo "+value.f0);
                return value.f0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .apply(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                @Override
                public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {
//                      System.out.println("vvvvv "+first+second);
                    return new Tuple2<>(first.f0,first.f1+second.f1);
                }
            })
            .print("join====");

点进去可以得到 join 的入口方法

//join 的入口方法  otherStream 为 stream2,生成 joinedStream
    public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
        return new JoinedStreams<>(this, otherStream);
    }

然后

//对 stream1 应用 keySelector
    public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
        requireNonNull(keySelector);
        final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
        return where(keySelector, keyType);
    }

然后调用 Where 类的 equalTo 方法,保证了 stream1 stream2 相同的 key 进入到同一个窗口

//对 stream2 应用 keySelector 保证 stream1 和 stream2 相同的 key 或者说要关联的 key 在同一个窗口内
        public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
            requireNonNull(keySelector);
            final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
            return equalTo(keySelector, otherKey);
        }

再往下调用 EqualTo 类的 window 方法

@PublicEvolving
            public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
            }

然后会调用 WithWindow 的 apply 方法

//应用 apply 方法
        public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
            TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
                function,
                JoinFunction.class,
                0,
                1,
                2,
                TypeExtractor.NO_INDEX,
                input1.getType(),
                input2.getType(),
                "Join",
                false);

            return apply(function, resultType);
        }

public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            coGroupedWindowedStream = input1.coGroup(input2)
                .where(keySelector1)
                .equalTo(keySelector2)
                .window(windowAssigner)
                .trigger(trigger)
                .evictor(evictor)
                .allowedLateness(allowedLateness);

            return coGroupedWindowedStream
                    .apply(new JoinCoGroupFunction<>(function), resultType);
        }

至此为止,关键性的方法 apply 出现了,通过 apply 的实现,我们可以知道,join 底层是通过 coGroup 实现的,得到 coGroupedWindowedStream,其中的 function 即为我们自定义的 function.

coGroupedWindowedStream 的 apply 方法最终调用了 WindowStream 的 apply 方法

// 转化为 operator
    private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
        KeySelector<T, K> keySel = input.getKeySelector();

        WindowOperator<K, T, Iterable<T>, R, W> operator;

        if (evictor != null) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
            
            // 窗口中 state ttl long_max_value
            ListStateDescriptor<StreamRecord<T>> stateDesc =
                    new ListStateDescriptor<>("window-contents", streamRecordSerializer);

            operator =
                new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    evictor,
                    allowedLateness,
                    lateDataOutputTag);

        } else {
            // 窗口中 state ttl long_max_value
            ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        // StreamOperator 转化为 dataStream
        return input.transform(opName, resultType, operator);
    }

转化为了 windowOperator。当 window 执行的时候,调用了 CoGroupWindowFunction 的 apply 方法

@Override
        // window 在执行的时候,即 userFunction.process
        public void apply(KEY key,
                W window,
                Iterable<TaggedUnion<T1, T2>> values,
                Collector<T> out) throws Exception {
            //会将两个 stream 的数据,添加到 list 当中
            List<T1> oneValues = new ArrayList<>();
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val: values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }

而 wrappedFunction.coGroup 调用了 JoinCoGroupFunction.coGroup,从而实现双流 join

@Override
        // join 最终执行的地方,其中 first、second 都是窗口中的数据
        public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
            for (T1 val1: first) {
                for (T2 val2: second) {
                    //这里执行用户定义的 join 方法
                    out.collect(wrappedFunction.join(val1, val2));
                }
            }
        }
上一篇下一篇

猜你喜欢

热点阅读