join

2021-05-07  本文已影响0人  ZYvette

join 操作

window join 方式

代码形式

stream1.join(stream2)
    .where(<stream1KeySelector>)
    .equalTo(<stream2KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

相当于sql中的
stream1 join stream2 where stream1.key=stream2.key

window join方式:

以下三种方式:

Tumbling Window Join Sliding Window Join Session Window Join

源码解析

image.png

?

windowjoin

public <T> DataStream<T> apply(
                JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            coGroupedWindowedStream =
                    input1.coGroup(input2)
                            .where(keySelector1)
                            .equalTo(keySelector2)
                            .window(windowAssigner)
                            .trigger(trigger)
                            .evictor(evictor)
                            .allowedLateness(allowedLateness);

            return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
        }

cogroup

CoGroup 表示联合分组,将两个不同的DataStream联合起来,在相同的窗口内按照相同的key分组处理

        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
// 联合keyselector,不同流选择对应的selector
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);

            DataStream<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>())
                            .setParallelism(input1.getParallelism())
                            .returns(unionType);
            DataStream<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>())
                            .setParallelism(input2.getParallelism())
                            .returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                                    unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);

            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }

            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }
UnionKeySelector 类:
public KEY getKey(TaggedUnion<T1, T2> value) throws Exception {
            if (value.isOne()) {
                return keySelector1.getKey(value.getOne());
            } else {
                return keySelector2.getKey(value.getTwo());
            }
        }

intervaljoin

image.png

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

代码形式

.keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(< ProcessJoinFunction >)

源码实现

@PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(
                ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
                TypeInformation<OUT> outputType) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(outputType);

            final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf =
                    left.getExecutionEnvironment().clean(processJoinFunction);

            final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
                    new IntervalJoinOperator<>(
                            lowerBound,
                            upperBound,
                            lowerBoundInclusive,
                            upperBoundInclusive,
                            left.getType().createSerializer(left.getExecutionConfig()),
                            right.getType().createSerializer(right.getExecutionConfig()),
                            cleanedUdf);

            return left.connect(right)
                    .keyBy(keySelector1, keySelector2)
                    .transform("Interval Join", outputType, operator);
        }

根据代码可以看出,两双流connect,然后根据interjoinoperator,分别存两个state用于存储数据,判断在窗口内则输出结果。

IntervalJoinOperator

@Override    
public void processElement1(StreamRecord<T1> record) throws Exception {
//left数据存储leftbuffer
        processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
    }

@Override
    public void processElement2(StreamRecord<T2> record) throws Exception {
//right数据存入rightbuffer
        processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
    }
    private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft)
            throws Exception {

        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException(
                    "Long.MIN_VALUE timestamp: Elements used in "
                            + "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }
      //数据存入state
        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket : otherBuffer.entries()) {
            final long timestamp = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound
                    || timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }
//没来一条,计算一下之前所属的窗口对应数据计算。
            for (BufferEntry<OTHER> entry : bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime =
                (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
//过期清除state里的数据,只有eventtime
        if (isLeft) {
      internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

//数据缓存到buffer
    private static <T> void addToBuffer(
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
            final T value,
            final long timestamp)
            throws Exception {
        List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
        if (elemsInBucket == null) {
            elemsInBucket = new ArrayList<>();
        }
        elemsInBucket.add(new BufferEntry<>(value, false));
        buffer.put(timestamp, elemsInBucket);
    }

备注:只支持eventtime.
b.数据每次来一条,计算一次对应窗口的内的数据。

上一篇下一篇

猜你喜欢

热点阅读