Flink ProcessingTime 在事件未到达,但窗口时

2020-04-27  本文已影响0人  zh_harry
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

//每个事件进入算子时触发
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
//并注册定时器
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
@Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
//如果是窗口中的第一个元素由注册定时器
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

根据当前时钟时间与窗口最大时间判断最大delay 时间,所以即使事件未达到时,内部的定时器也会定时触发!

public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {

        // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
        // T says we won't see elements in the future with a timestamp smaller or equal to T.
        // With processing time, we therefore need to delay firing the timer by one ms.
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.schedule(
                    new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

count 窗口为global窗口

/**
     * Windows this {@code KeyedStream} into tumbling count windows.
     *
     * @param size The size of the windows in number of elements.
     */
    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

count 窗口的触发条件如下

@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
上一篇下一篇

猜你喜欢

热点阅读