Flink源码阅读系列

Flink源码阅读之Window执行过程

2020-05-20  本文已影响0人  〇白衣卿相〇

前面文章介绍了Flink的任务执行流程,每一个operator都会有对应的Task去执行,如果程序中使用了window的话,当程序执行到window的task时就会调用WindowOperator中的实现。

    public void processElement(StreamRecord<IN> element) throws Exception {
        //根据元素划分窗口
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;
        //拿到划分的Key
        final K key = this.<K>getKeyedStateBackend().getCurrentKey();
        //如果是session window 则会有合并的过程
        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<W> mergingWindows = getMergingWindowSet();
            //...session window这里不讨论
        } else {
            //for循环是因为如果是slide window的话一条数据可能属于多个窗口
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;
                
                windowState.setCurrentNamespace(window);
                //将数据存入window state
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;
                //调用trigger的onElement方法,Trigger可以是eventTime、processTime或自定义的trigger
                TriggerResult triggerResult = triggerContext.onElement(element);
                //如果返回结果是Fire则触发计算
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                //如果同时要清除窗口状态的话则清除
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                //注册定时器,窗口结束后清理状态
                registerCleanupTimer(window);
            }
        }

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        //迟到的元素通过side output输出
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

Trigger

不同的trigger实现有所不同,分别看下eventTime和processTime的
EventTimeTrigger

public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

判断当前水印是否大于等于窗口结束时间,是的话则返回FIRE,否则将窗口结束时间注册为定时器,返回CONTINUE。

public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

直接添加到的优先级队列中。

再看ProcessingTimeTrigger

public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

直接注册定时器,返回CONTINUE,因为process time是基于机器系统时间来判断的,跟元素eventtime没有关系。

@Override
public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
            }
        }
    }

注册定时器差异比较大,先从优先级队列中取出时间最近的定时器,尝试吧当前时间注册进去,如果成功了,判断当前时间是不是比最近的定时器还要早,是的话就取消最近的定时器。
这里涉及java8的方法引用传递的概念,把onProcessingTime的引用传递给registerTimer
ProcessingTimeServiceImpl

public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
        return timerService.registerTimer(timestamp, processingTimeCallbackWrapper.apply(target));
    }

SystemProcessingTimeService

/** The executor service that schedules and calls the triggers of this task. */
    private final ScheduledThreadPoolExecutor timerService;
@Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {

        // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
        // T says we won't see elements in the future with a timestamp smaller or equal to T.
        // With processing time, we therefore need to delay firing the timer by one ms.
        //延迟调度时间=窗口的结束时间-当前时间+1ms
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            //将上面的回调函数包装成task放到线程池中调度
            return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

    private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long timestamp) {
        //起一个调度线程,将回调函数传递进去
        return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0);
    }

当延迟时间到达时会调用ScheduledTask的run方法

public void run() {
            if (serviceStatus.get() != STATUS_ALIVE) {
                return;
            }
            try {
                callback.onProcessingTime(nextTimestamp);
            } catch (Exception ex) {
                exceptionHandler.handleException(ex);
            }
            nextTimestamp += period;
        }

上面的callback就是上面onProcessingTime的引用,当触发 InternalTimerServiceImpl.onProcessingTime() 回调后,会从优先级队列中取出所有符合条件的触发器,并调用 triggerTarget.onProcessingTime(timer)。

private void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;

        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);//调用对应trigger的onProcessingTime方法
        }

        if (timer != null && nextTimer == null) {
        //注册下一个定时器
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
        }
    }

这样就会不断循环下去。
上面是Trigger的过程,当窗口触发计算后,如果还需要清除状态,那么就把状态清除,然后取消当前窗口的定时器。

Evicotr

如果使用了Evictor的话,则会调用EvictingWindowOperator.processElement和上面不同的是emitWindowContents方法

private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());

        // Work around type system restrictions...
        FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
            .from(contents)
            .transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
                @Override
                public TimestampedValue<IN> apply(StreamRecord<IN> input) {
                    return TimestampedValue.from(input);
                }
            });
        evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));

        FluentIterable<IN> projectedContents = recordsWithTimestamp
            .transform(new Function<TimestampedValue<IN>, IN>() {
                @Override
                public IN apply(TimestampedValue<IN> input) {
                    return input.getValue();
                }
            });

        processContext.window = triggerContext.window;
        userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
        evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));

        //work around to fix FLINK-4369, remove the evicted elements from the windowState.
        //this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
        windowState.clear();
        for (TimestampedValue<IN> record : recordsWithTimestamp) {
            windowState.add(record.getStreamRecord());
        }
    }

在真正窗口处理函数之前和之后分别调用evictBefore和evictAfter方法来做一些过滤。

详细的窗口demo、trigger、evictor、windowFunction的使用样例,参见我的项目
https://github.com/zhuxiaoshang/flink-be-god

上一篇下一篇

猜你喜欢

热点阅读