JStorm:Window机制

2018-06-10  本文已影响26人  远o_O

0x01、前言

0x02、Window

0x03、Event Time

Storm的window也支持event time了。TimestampExtractor、WatermarkGenerator、乱序消息的处理、Retractor当然不会少了。

1、WatermarkGenerator

使用event time的时候,需要定义watermark,否则jstorm会使用默认的watermark实现:PeriodicWatermarkGenerator。 即定期往下游发送watermark。

watermark标识了一个流是一直前进,不可回退的。即,假设当前收到了2017-01-13 18:00:00的watermark,那么意味着上游后续要发送的所有数据都不 会早于18:00:00。如果早于这个时间,则认为是late element。处理策略参见下面late element。

当收到一条消息时,jstorm同时会调用WatermarkGenerator#onElement方法,以更新它内部的timestamp。同时,通过watermarkInterval来定时 发送watermark和检查event windows是否需要purge。

jstorm支持几种通过watermark触发window purge的策略: * GLOBAL_MAX_TIMESTAMP 全局最大时间戳。这种策略会始终使用接收到的所有task中的最大的时间戳。如果这个时间戳>窗口边界,就会purge window * MAX_TIMESTAMP_WITH_RATIO 这个策略在上面的基础上,还指定了收到上游watermark的task的比例,默认为0.9。即,只有当时间戳>窗口边界, 且收到了90%以上的task的watermark,才会purge window * TASK_MAX_GLOBAL_MIN_TIMESTAMP 这个策略,会记录所有上游task发送的watermark的值,然后取所有task的watermark的最小值作为当前时间戳, 以防止各别task的timestamp很大导致窗口被过早purge。这种策略是jstorm的默认策略。

2、乱序消息的处理

event time的场景中,消息的乱序几乎是必然会出现的。 在上面的场景中即为,当前watermark已经到达18:00:00,但是下一条消息到达时,发现它的时间戳是17:30:00。这就是一条乱序的消息。

默认情况下,jstorm会丢弃这条消息。也可以通过实现Retractor接口来重新计算一个已经purge的窗口值。见下面。

3、Retractor

这个接口只有一个方法:

void retract(Tuple element, Collection<TimeWindow> windows);

即,这个element所属的窗口为windows这个集合,由用户指定如何处理这条乱序消息。

以word count为例,我们可能每隔1分钟计算word count,然后最终把每个window下的word count输出到hbase或者tair中。

在watermark=18:00:00时,假设之前计算出来的17:30分这个窗口的word: aa的count=100。接下来我们又收到一条消息,timestamp=17:30:00,word=aa。

此时我们需要更新HBase/Tair中17:30分这个窗口中对应的aa的count值。那么我们可以在retract方法中,直接去update HBase或者tair。

当然,如果经常会发生乱序,一条条处理效率显然是太慢了。建议用户可以保存一个Map<TimeWindow, List<Tuple>>(或者使用guava中的Multimap)对象, 当达到一定数量再触发一次计算。

不过说了这么多,使用retractor的一个最大前提还是:计算结果必须是可被更新的。否则就只能丢弃然后打个日志了。

上一篇 下一篇

猜你喜欢

热点阅读