数客联盟

EventTimeTrigger中onElement方法分析

2020-03-03  本文已影响0人  Woople

疑问

onElement方法的具体实现如下

public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    // if the watermark is already past the window fire immediately
    return TriggerResult.FIRE;
  } else {
    ctx.registerEventTimeTimer(window.maxTimestamp());
    return TriggerResult.CONTINUE;
  }
}

下面是官方文档对于这个方法的解释

The onElement() method is called for each element that is added to a window.

也就是说每条数据加入这个窗口中都会调用一次这个方法,什么情况下这个方法会返回TriggerResult.FIRE

分析

关键问题在于WindowOperator#isWindowLate方法

protected boolean isWindowLate(W window) {
  return (windowAssigner.isEventTime() 
      && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}

如果allowedLateness没有设置默认为0,那么第二个判断条件相当于window.maxTimestamp()<=internalTimerService.currentWatermark()如果这个条件为true,那么上层WindowOperator#processElement方法中

// drop if the window is already late
if (isWindowLate(window)) {
  continue;
}

会走到这个判断里面,那么就不会调用到EventTimeTrigger#onElement,反过来说,能调用到EventTimeTrigger#onElement方法的情况,window.maxTimestamp() <= ctx.getCurrentWatermark()就不会成立。

但是当设置allowedLateness大于0的情况,数据迟到的条件变成了window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark(),那么假设滚动窗口的size是30秒,设置allowedLateness为10秒这时候来了一条数据的时间戳为1573441910000,那么此时window.maxTimestamp()=1573441919999,allowedLateness=10000,internalTimerService.currentWatermark()=1573441924000,不满足上面迟到的条件,进入EventTimeTrigger#onElement,这时就满足了window.maxTimestamp() <= ctx.getCurrentWatermark(),即返回值就是TriggerResult.FIRE

小结

上面的过程比较绕,简单的说,如果allowedLateness=0那么进入EventTimeTrigger#onElement后不可能返回TriggerResult.FIRE,因为满足这个判断条件的数据在前面isWindowLate(window)判断中已经过滤掉了。如果allowedLateness>0那么满足迟到的数据进入EventTimeTrigger#onElement后就会返回TriggerResult.FIRE。有兴趣的读者可以运行demo进行测试。

总结

结合之前的文章EventTimeTrigger中onEventTime方法分析可以得到下面的结论

上一篇下一篇

猜你喜欢

热点阅读