Flink WindowJoin

2022-03-30  本文已影响0人  kaiker

JoinedStreams

源码里runWindowJoin的example

    public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
            DataStream<Tuple2<String, Integer>> grades,
            DataStream<Tuple2<String, Integer>> salaries,
            long windowSize) {

        return grades.join(salaries)
                .where(new NameKeySelector())
                .equalTo(new NameKeySelector())
                .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
                .apply(
                        new JoinFunction<
                                Tuple2<String, Integer>,
                                Tuple2<String, Integer>,
                                Tuple3<String, Integer, Integer>>() {

                            @Override
                            public Tuple3<String, Integer, Integer> join(
                                    Tuple2<String, Integer> first, Tuple2<String, Integer> second) {
                                return new Tuple3<String, Integer, Integer>(
                                        first.f0, first.f1, second.f1);
                            }
                        });
    }
        public <T> DataStream<T> apply(
                JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            coGroupedWindowedStream =
                    input1.coGroup(input2)
                            .where(keySelector1)
                            .equalTo(keySelector2)
                            .window(windowAssigner)
                            .trigger(trigger)
                            .evictor(evictor)
                            .allowedLateness(allowedLateness);

            return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
        }
// coGroupedWindowedStream#apply
        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);

            DataStream<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>())
                            .setParallelism(input1.getParallelism())
                            .returns(unionType);
            DataStream<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>())
                            .setParallelism(input2.getParallelism())
                            .returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                                    unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);

            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }

            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }
// WindowStream#apply
    public <R> SingleOutputStreamOperator<R> apply(
            WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        function = input.getExecutionEnvironment().clean(function);

        final String opName = builder.generateOperatorName();
        final String opDescription = builder.generateOperatorDescription(function, null);
        OneInputStreamOperator<T, R> operator = builder.apply(function);

        return input.transform(opName, resultType, operator).setDescription(opDescription);
    }
    private static class JoinCoGroupFunction<T1, T2, T>
            extends WrappingFunction<JoinFunction<T1, T2, T>>
            implements CoGroupFunction<T1, T2, T> {
        private static final long serialVersionUID = 1L;

        public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
            super(wrappedFunction);
        }

        @Override
        public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out)
                throws Exception {
            for (T1 val1 : first) {
                for (T2 val2 : second) {
                    out.collect(wrappedFunction.join(val1, val2));
                }
            }
        }
    }

回撤是如何实现的

https://blog.csdn.net/a1240466196/article/details/109975823

IntervalJoin

https://www.jianshu.com/p/b807c5ffee48

上一篇下一篇

猜你喜欢

热点阅读