聊聊flink的window操作

2019-01-01  本文已影响41人  go4it

本文主要研究一下flink的window操作

window

DataStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

    public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return windowAll(TumblingProcessingTimeWindows.of(size));
        } else {
            return windowAll(TumblingEventTimeWindows.of(size));
        }
    }

    public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return windowAll(SlidingProcessingTimeWindows.of(size, slide));
        } else {
            return windowAll(SlidingEventTimeWindows.of(size, slide));
        }
    }

    public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
        return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
        return windowAll(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    }

    @PublicEvolving
    public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
        return new AllWindowedStream<>(this, assigner);
    }

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(TumblingProcessingTimeWindows.of(size));
        } else {
            return window(TumblingEventTimeWindows.of(size));
        }
    }

    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(SlidingProcessingTimeWindows.of(size, slide));
        } else {
            return window(SlidingEventTimeWindows.of(size, slide));
        }
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
        return window(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    }

    @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java

@Public
public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream. */
    private final KeyedStream<T, K> input;

    /** The window assigner. */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

    /** The user-specified allowed lateness. */
    private long allowedLateness = 0L;

    /**
     * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
     * dropped.
     */
    private OutputTag<T> lateDataOutputTag;

    @PublicEvolving
    public WindowedStream(KeyedStream<T, K> input,
            WindowAssigner<? super T, W> windowAssigner) {
        this.input = input;
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
    }

    @PublicEvolving
    public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
        if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }

        if (windowAssigner instanceof BaseAlignedWindowAssigner) {
            throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with a custom trigger.");
        }

        this.trigger = trigger;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> allowedLateness(Time lateness) {
        final long millis = lateness.toMilliseconds();
        checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

        this.allowedLateness = millis;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
        this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
        if (windowAssigner instanceof BaseAlignedWindowAssigner) {
            throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with an Evictor.");
        }
        this.evictor = evictor;
        return this;
    }

    // ------------------------------------------------------------------------
    //  Operations on the keyed windows
    // ------------------------------------------------------------------------

    //......
}

AllWindowedStream的属性/操作基本跟WindowedStream类似,这里就不详细展开

小结

doc

上一篇下一篇

猜你喜欢

热点阅读