Event Time and Watermarks
文档解读
文档路径
/Application Development/Streaming (DataStream API)/Event Time
关于event time和watermark,内容比较多,本文的顺序不一定完全按照原文档的顺序进行解读
Event time
官方的定义
Event time is the time that each individual event occurred on its producing device.
事件时间就是数据流中事件实际发生的时间。事件时间也是实际业务中需要处理的时间,由于各种数据源的不同特点,可能在流中计算的时候会遇到事件的延迟或者事件时间的乱序,为解决这个问题出现了Watermark。
Watermark
官方对Watermark概念的说明可以参考两个地方,第一就是官网的原文
The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark).
第二就是Watermark类的API说明
A Watermark tells operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator. Watermarks are emitted at the sources and propagate through the operators of the topology. Operators must themselves emit watermarks to downstream operators using
Output.emitWatermark(Watermark)
. Operators that do not internally buffer elements can always forward the watermark that they receive. Operators that buffer elements, such as window operators, must forward a watermark after emission of elements that is triggered by the arriving watermark.In some cases a watermark is only a heuristic and operators should be able to deal with late elements. They can either discard those or update the result and emit updates/retractions to downstream operations.
When a source closes it will emit a final watermark with timestamp
Long.MAX_VALUE
. When an operator receives this it will know that no more input will be arriving in the future.
本文将针对第二种解释做进一步的说明,通过上文可以总结2点:
- Watermark中包含了一个时间戳,作用就是告诉算子在这个时间戳之前的数据都已经到达了,不会再有小于或等于这个时间戳的数据再到达这个算子了。
- Watermark可以认为有一个“生命周期”,即出生,传播,死亡。
Watermark生命周期
Watermark的“出生”
Watermark的产生有两种方式,
- Source Functions中直接指定,请参考官文Source Functions with Timestamps and Watermarks
- 通过
DataStream#assignTimestampsAndWatermarks
方式指定如何生成Watermark,这种方式又有两种模式,时间驱动的周期水位线(Periodic Watermarks)和数据驱动的定点水位线(Punctuated Watermarks),对于周期水位线,官方提供了两种实现可以参考BoundedOutOfOrdernessTimestampExtractor.java
和AscendingTimestampExtractor.java
。
Watermark的传播
此部分的内容可以参见笔者之前的文章Watermarks in Parallel Streams
Watermark的“死亡”
当watermark的时间戳变成Long.MAX_VALUE的时候,也就表示告诉算子再也没有数据会到达了
扩展阅读
对于以周期模式产生watermark的时候,官方给出的说明:
The interval (every n milliseconds) in which the watermark will be generated is defined via ExecutionConfig.setAutoWatermarkInterval(...).
对于这句话读者要知道是:
- 如果不指定,默认是200毫秒
-
ExecutionConfig.setAutoWatermarkInterval
一定要放到env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
后面才会生效,因为在setStreamTimeCharacteristic
里面会强制设置周期为200毫秒,如果这个方法后执行,就会覆盖原有设置的周期
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
这个参数是在TimestampsAndPeriodicWatermarksOperator#open
中会拿到设置的watermarkInterval
并将此值传给timerService
public void open() throws Exception {
super.open();
currentWatermark = Long.MIN_VALUE;
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
这里getProcessingTimeService()
返回的对象就是StreamTask中的timerService。