聊聊flink的consecutive windowed ope

2019-01-09  本文已影响11人  go4it

本文主要研究一下flink的consecutive windowed operations


DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .process(new TopKWindowFunction());



public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

    private static final long serialVersionUID = 1L;

    private transient long watermarkInterval;

    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;

    public void open() throws Exception {

        currentWatermark = Long.MIN_VALUE;
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

        if (watermarkInterval > 0) {
            long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);

    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

        output.collect(element.replace(element.getValue(), newTimestamp));

    public void onProcessingTime(long timestamp) throws Exception {
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark

        long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);

     * Override the base implementation to completely ignore watermarks propagated from
     * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
     * watermarks from here).
    public void processWatermark(Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;

    public void close() throws Exception {

        // emit a final watermark
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark



public class SystemProcessingTimeService extends ProcessingTimeService {

    private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);

    private static final int STATUS_ALIVE = 0;
    private static final int STATUS_QUIESCED = 1;
    private static final int STATUS_SHUTDOWN = 2;

    // ------------------------------------------------------------------------

    /** The containing task that owns this time service provider. */
    private final AsyncExceptionHandler task;

    /** The lock that timers acquire upon triggering. */
    private final Object checkpointLock;

    /** The executor service that schedules and calls the triggers of this task. */
    private final ScheduledThreadPoolExecutor timerService;

    private final AtomicInteger status;

    public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
        this(failureHandler, checkpointLock, null);

    public SystemProcessingTimeService(
            AsyncExceptionHandler task,
            Object checkpointLock,
            ThreadFactory threadFactory) {

        this.task = checkNotNull(task);
        this.checkpointLock = checkNotNull(checkpointLock);

        this.status = new AtomicInteger(STATUS_ALIVE);

        if (threadFactory == null) {
            this.timerService = new ScheduledThreadPoolExecutor(1);
        } else {
            this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);

        // tasks should be removed if the future is canceled

        // make sure shutdown removes all pending tasks

    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();

    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {

        // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
        // T says we won't see elements in the future with a timestamp smaller or equal to T.
        // With processing time, we therefore need to delay firing the timer by one ms.
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.schedule(
                    new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            else {
                // something else happened, so propagate the exception
                throw e;




public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {


        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                isSkippedElement = false;


                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                    emitWindowContents(window, contents);

                if (triggerResult.isPurge()) {

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
            } else {

     * Emits the contents of the given window using the {@link InternalWindowFunction}.
    private void emitWindowContents(W window, ACC contents) throws Exception {
        processContext.window = window;
        userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);




public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

    public boolean canMerge() {
        return true;

    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {

    public String toString() {
        return "EventTimeTrigger()";

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();




