Flink

初识Flink WaterMarker

2018-09-23  本文已影响215人  远o_O

前言

为什么需要WaterMark

WaterMark如何触发窗口计算

[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)

如果window大小是10秒,则window会被分为如下的形式:当然还有一个offset值可以控制window的起始值不是整点。

[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

window.maxTimestamp = 窗口结束时间 - 1,flink时间窗口的单位为ms,也就是时间戳,也就是说就差一毫秒,也不会触发窗口。

    public long maxTimestamp() {
        return end - 1;
    }

然后到调用Evictor的地方看看:没有内容是不会触发计算的

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
                    if (contents == null) {
                        // if we have no state, there is nothing to do
                        continue;
                    }
                    emitWindowContents(actualWindow, contents, evictingWindowState);
                }

Flink中WaterMarker的类型

周期水位线(Periodic Watermark)

标点水位线(Punctuated Watermark)

通过数据流中某些特殊标记事件来触发新水位线的生成。

迟到事件

上一篇 下一篇

猜你喜欢

热点阅读