Flink专题

事件时间(Event Time)

2019-02-01  本文已影响50人  尼小摩

Event Time / Processing Time / Ingestion Time

Flink在streaming程序中支持不同的时间概念。

设置时间特性(Setting a Time Characteristic)

Flink DataStream 程序的第一部分常常是设置时间特性,这个设置定义了数据流源的行为(例如: 它们是否分配时间戳) 以及window操作需要用哪个时间概念,如:KeyedStream.timeWindow(Time.seconds(30))

下面的例子中展示了一个在小时级时间窗口中聚合事件的Flink程序,window的行为与时间特性相适应。

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))

stream
    .keyBy( _.getUser )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) => a.add(b) )
    .addSink(...)

注意,为了在事件时间中运行此示例,程序需要使用定义事件时间和自己触发watermarks的源数据,或者程序必须在源文件之后注入一个Timestamp AssignerWatermark Generator。这些函数描述了如何获取事件的时间戳以及哪些程度的无序事件流需要展示。

下一节描述Timestamp AssignerWatermark Generator背后的机制。有关如何在Flink DataStream API中使用Timestamp AssignerWatermark Generator的指南,请参考Generating Timestamps / Watermarks

事件时间和水印(Event Time and Watermarks)

注意:Flink实现了很多Dataflow Model中的技术。想要获得更多关于event time和watermark的介绍,请查看下面的文章:
Streaming 101 by Tyler Akidau
The Dataflow Model paper
一个支持事件时间的流处理器需要一种方式来衡量事件时间的进度,例如:一个小时级的窗口操作在事件时间达到小时的末尾之后需要被通知,以便操作能够在进程中关闭窗口。

Event Time可以独立于处理时间来进行,例如:程序中一个操作的当前事件时间可能会稍微晚于处理中的时间,而它们是以相同的速度进行的;另一方面,一个流程序可能处理一周的事件时间,通过快速转发缓存在Kafka topic中的数据,仅用几秒的时间来处理。

Flink衡量event time 进度的机制是watermark,watermark流作为数据流的一部分,携带了一个时间戳t。一个Watermark(t)声明了数据流中的事件时间已经到t了,也就是说数据流中不应该再有时间戳t'<=t的元素了。

下图中展示了一个带时间戳的事件流,并且内联一个水印。在这个例子中,事件是有序的,也就是说水印仅是在流中做简单标记而已。


Watermark对于无序流至关重要,如下图所示,事件不是按时间戳排序的。一般来说,Watermark是一种声明在流中的那个点,在某个时间戳之前的所有事件都应该已经到达。一旦watermark到达一个操作,这个操作就可以将事件时间的时钟提前到watermark值的前面。

请注意,事件时间由新创建的流元素(或多个元素)继承,这些流元素或来自生成它们的事件,或来自触发这些元素创建的水印。

平行流中的水印(Watermarks in Parallel Streams)

Watermarks是在源函数处或之后直接生成的。源函数的每个并行子任务通常独立生成Watermarks。这些水印定义了特定并行源的事件时间。

因为watermark流贯穿整个流程序,他们在到达的操作处提前了事件时间。当一个操作提前它的事件时间,它都会为下游的后继操作生成一个新的watermark

一些操作使用多个输入流;例如,union,keyBy(…), partition(…)函数后面的操作。这样一个操作符的当前事件时间是其输入流的事件时间的最小值。当输入流更新它们的事件时间时,操作也会更新。

下图展示了一个事件和水印流经并行流的示例,以及跟踪事件时间的操作。


延迟元素(Late Elements)

某些元素有可能会违反水印条件,这意味着在Watermark(t)发生之后,还会出现更多带有时间戳t ' <= t的元素。实际上,在许多设置中,某些元素可以任意延迟,因此不可能指定某个事件时间戳的所有元素发生的时间。此外,延迟是有界的,水印延迟太多通常也是不可取的,因为它会在事件时间窗的评估中造成太多的延迟。

为此,流程序可能会显式地预期一些延迟元素。延迟元素是在系统时间(如水印所示)已经超过延迟元素的时间戳时间之后到达的元素。有关如何在事件时间窗口中处理延迟元素的更多信息,请参见“允许延迟”。

闲置资源(Idling sources)

目前,使用纯事件时间水印生成器(watermarks generators),如果没有要处理的元素,水印无法进行处理。这意味着在传入数据中出现间隙时,事件时间不会进展,例如窗口操作不会被触发,因此现有的窗口将无法生成任何输出数据。为了避免这种情况,可以使用周期性的水印分配器,而不是基于元素时间戳进行分配。一个示例解决方案可以是切换的分配程序在一段时间没有观察到新的事件之后,使用当前的处理时间作为时间基础。

可以使用SourceFunction.SourceContext#markAsTemporarilyIdle将源标记为空闲。有关详细信息,请参考此方法的Javadoc以及StreamStatus

如何操作处理水印(How operators are processing watermarks)

一般来说,在将指定的水印转发给下游之前,操作人员需要对其进行完整的处理。例如,WindowOperator首先评估应该触发哪些窗口,并且只有在产生所有由水印触发的输出后,水印本身才会被发送到下游。换句话说,由于出现水印而产生的所有元素都将在水印之前发出。

同样的规则也适用于TwoInputStreamOperator。但是,在这种情况下,运算符的当前水印被定义为两个输入的最小值。

实现定义的详细信息OneInputStreamOperator.processWatermarkTwoInputStreamOperator.processWatermark1TwoInputStreamOperator.processWatermark2方法。

上一篇下一篇

猜你喜欢

热点阅读