Flink-Streaming-EventTime-Overvi

2019-04-29  本文已影响0人  耳边的火

Event Time / Processing Time / Ingestion Time

Flink 在流应用中支持不同的时间概念:

设置时间语义

Flink DataStream 程序的第一部分通常会设置 时间语义(time characteristic)。这个设置定义了生成数据流的数据源的行为(如,是否需要指定timestamp),以及window操作符应该使用哪种时间概念对数据进行处理。
下面的例子展示了Flink程序按小时聚合数据。window的行为和时间语义相对应。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);

需要注意的是,想要在事件时间语义下运行该程序,需要使用能够对每个数据定义事件时间以及生成watermark的source,如果source不满足这个条件,程序需要在source后,指定timestamp assigner以及watermark generator。这些函数描述了如何从数据中提取事件时间以及该程序所能够处理的数据最大乱序程度。
接下来的部分描述了timestamp与watermark背后的一些机制。对于如何使用timestamp assigner以及watermark generator,可以参阅 Generating Timestamps / Watermarks

Event Time 与 Watermarks

注意:Flink实现了Dataflow Model中的许多技术。为了更好的介绍 event time 与 watermarks,建议先阅读下面的文章:

支持event time 的流处理器需要一个方法来衡量当前event time时间标尺下的处理进度。例如,一个按小时聚合数据的window操作,流处理器需要在event time超过window的end time后,提醒window你可以执行操作并关闭window了。
event time与processing time(注:这一段话中,将processing time替换为 系统时间 会更好理解写2)是独立处理的。例如:在一个应用中(注:处理实时数据时),当前event time可能稍稍落后于processing time,但是两者都会以相同的速度流逝。但是另一方面,另一个流程序可能会在几秒内就能够处理在event time语义下的几周的数据,比如读取kafka topic中的历史数据。
(注:为了简单的理解event time与processing time 的不同,可以理解为event time是以数据自带的时间戳的时间作为时间坐标系或者时间标尺,而processing time是以真实世界中,程序正在运行时的时间作为时间坐标系或时间标尺)


Flink中衡量event time的概念称作 watermark。Watermark作为数据流的一部分,并且会携带一个时间戳 t。Watermark(t) 表示当前event time坐标系下的数据流已经达到了时间t,这意味着应该不会再有时间戳小于等于t的数据出现(注:也就是不会再有迟到数据)
(注:为什么需要watermark?processing time是真实世界的时间,时间是平滑流逝的,而event time是从数据中抽取出来的时间,是离散的时间,因此引入watermark的概念,就是为了分隔不同的数据到不同的window中去,并且告知window数据全部到达,可以执行计算了。一个很简单的例子:一个window要聚合event time 在 10:00:00 到 10:10:00的数据,而真实流入window操作符的event time可能不会这么凑巧,最后一个数据的时间戳恰好就是 10:10:00,因此watermark的作用就是告知window,event time的坐标系下,已经把所有合适的数据聚合起来了,你这个window可以执行计算了)
下面的图展示了流中数据的时间戳以及流中的watermark。示例中的数据是按照时间戳的顺序进行的排列,意味着watermark在流中是周期性产生的。



对于处理乱序的数据流,watermark起到很关键的作用,如下图,数据并没有按照时间戳的顺序排列好。一般来说,watermark意味着在watermark产生的时刻起,所有时间戳小于watermark的数据都已经到达了。一旦watermark到达一个操作符,操作符就会将它内部的event time时间表调整到watermark所代表的时间。


注意event time的值要么来自刚刚流入source的数据的内置时间戳,要么是来自该数据所触发的watermark的时间戳。

并发流中的Watermark


watermark在source function中生成或是在其后通过方法来生成。source的每个并发的subtask一般都会生成各自的watermark。这些watermark定义了在某个source中的event time的值(即,在event time语义下,当前时间是几点)。
因为watermark会随着数据流在程序中流动,因此当它到达某个operator后,就会修改这个operator的event time的值。一旦operator修改了event time的值,就会生成一个新的watermark向下游传播。
一些操作符会消费多个数据流;如:union操作,或者是紧跟着keyBy/partition函数的操作符。这些操作符当前的event time是所有数据流中最小的event time值。当输入流更新时间后,操作符也会更新时间。
下图展示了并发流中watermark的流动以及操作符如何追踪event time的。


注意:kafka source支持 per-partition watermark,你可以参阅这里

迟到数据


事实上,即便watermark的概念可以理解为,所有时间戳小于watermark的数据都已经到达。但是真实情况下,确实会有数据违反了这样的约定,这意味着,即便生成了watermark(t),但是仍然会有时间戳t' <= t的数据在随后进入数据流。事实上,在许多真实案例中,某些数据确实会延迟到达,这使得定义一个所有数据都已经进入数据流的时间变得不可能。此外,即便迟到数据的迟到时间是有最大时间界限的,但是推迟太长时间再生成watermark(这样才能保证迟到的数据被正确处理)也不是我们期望的,因为这会造成处理的延迟。
针对这个情况,流程序需要明确迟到数据。迟到数据是指那些,当数据到达时,系统event time表的时间已经流过了这个时间戳的数据。可以查阅 Allowed Lateness 文档学习如何处理event time语义下window操作符的迟到数据。

空转的source Idling Source


目前,仅使用event time watermark generator时,如果没有数据流入,就不会有watermark产生。这意味着,如果数据流突然没有数据流入了,event time不会再向前流动(系统中的所有操作符的event time不会更新)。如:window操作符不会被新的watermark触发计算操作,也就不会有这window的结果输出。
为了规避这个问题,可以使用 periodic watermark assigner,它并不仅仅基于数据的时间戳生成watermark。一个示例的解决方法可以是:当检测到有一段时间没有数据流入时,可以使用processing time生成watermark。
Source可以使用 SourceFunction.SourceContext#markAsTemporarilyIdle来定义怎样情况下该source处于空转状态。更多信息可以参阅Javadoc的文档以及 StreamStatus。

调试watermark


参阅 Debugging Window & Event Time 部分学习如何在运行时调试。

操作符如何处理watermark


操作符需要处理完给定的watermark后才可以将其转发给下游操作符。如:WindowOperator 首先会计算那个window应该被触发,并且直到得到所有应该触发计算的window的结构后,才会吧watermark发送到下游。换句话说,触发watermark生成的数据,应该比watermark更早的加入流(注:这样才会保证计算结果的完整性和正确性)。
相同的规则适用于 TwoInpuStreamOperatro 。不同的是,这时,operator中的watermark是其所有输入流中watermark最小的那个。
详细的过程在这几个接口的实现类的方法中被定义:OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 与TwoInputStreamOperator#processWatermark2

上一篇 下一篇

猜你喜欢

热点阅读