Flink 之 Window
1. Overview
本文首先将介绍 Flink 的时间语义,然后介绍 Windows 的基本使用和概念,最后通过一个具体 demo 进行实战演练。
2. 时间语义
Spark Streaming 微批处理数据时,对时间维度划分没有像 Flink 这种真正的实时流处理框架细致。在实时的流处理框架中,时间是一个很重要的维度。
时间语义2.1 Processing time
在执行相应算子时,使用机器本地系统的时间。Spark Streaming 唯一支持的时间语义。也是 Flink 中默认的时间语义。这种语义的优点是不用考虑事件在传输中的延迟,所以是最高效的语义。这里的高效是指窗口可以无需考虑延迟而直接进行触发计算。但是缺点也很明显,即给出的结果是不确定的。比如统计最近 5 s 内下单次数超过两次的用户,由于用户之间的网络状况不一致,可能导致满足条件的用户因为网络延迟而没有被统计到。
2.2 Event time
在执行相应算子时,使用接收的事件发生时的时间。使用 Event Time 可以解决 Processing time 导致的结果不确定性问题。但是这将面临一个新的问题,什么时候触发窗口计算?可以以最新收到事件的 event time 作为标准吗?理想状况下,数据按序接收,那么这种策略是可行的。但是实际情况中,往往会因为各种因素导致接收到的数据的 event time 是乱序的,如果使用之前到策略触发窗口计算时,可能导致本应该属于该窗口的数据因为迟到而没有参与到计算。那一个很朴素的思想就是进行等待,那应该等待多久呢?为了解决这个问题,flink 引入了 watermarks 机制。
2.2.1 Watermarks
为了衡量当前流在某个算子的进度(时间)。Flink 引入了 watermark 机制,watermark 作为流中一种特殊的事件( 只有一个时间戳属性t ),某个算子接收到这个事件时,代表所有 t' <= t 的事件已经全部到达。
-
watermark 设置
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]
通过上面两个方法可以看到,watermark 的设置是通过指定一个 Assigner 完成的。其中前者是按照时间间隔定期生成 watermark( 默认 200ms ),适合数据量大,稠密的情况,也是我们最常使用的方式。后者是在每个事件后面都生成一个 watermark,适合稀疏、数据量小的情况。这里以前者为例进行简单说明:
sensorStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
AssignerWithPeriodicWatermarks
是一个接口,这里传入一个它的实现类。需要对两个地方进行设置。首先是通过实现 extractTimestamp
方法指定如何获取 event_time。然后指定最大乱序程度,这里指定了 3s。通过这两个设置就可以生成 watermark 了。通过查看BoundedOutOfOrdernessTimestampExtractor这个实现类,查看watermark是如何生成的:
@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
其中 maxOutOfOrderness 就是我们指定的 3s。可以得出:
T_watermark = T_maxEventTime - T_maxOutOfOrderness
可以看到,通过指定最大乱序程度来生成 watermark,是一种在结果准确性和效率之间的权衡。最大乱序程度指定的越大,结果越准确,但相应的结果的时效性将降低,并且集群的性能也会受到影响,因为 flink 需要对窗口中的数据进行缓存。
当我们当数据中时间戳不存在乱序情况时,可以通过
def assignAscendingTimestamps(extractor: T => Long): DataStream[T]
让系统自动生成 watermark,而不需要指定最大乱序值。
-
watermark 传递
前面讨论了 watermark 的定义和设置方式,但是在 Flink 中,一个算子可能具有多个并行度,那么 watermark 作为一种特殊的事件,在上游到下游的分区中是如何传递的呢?
watermark 传递
Flink 上游向下游传递 watermark 时,通过广播的方式对下游每个分区进行传递。下游对每个分区都维护一个 watermark,并以所有分区中最小的 watermark 作为自己的 event time 进度,同时作为向下游传递的 watermark 值。以上图为例,步骤1中当前算子的 event time 是 2。共有 4 个分区,此时 1 号分区传递过来一个值为 4 的 watermark,更新后当前算子最小的 watermark 变为 3。随后更新当前算子的 event time,并向下游广播。后面两张图原理相似。
3. Windows 操作
首先看一下官网给出的 windows 的操作流程(Keyed Windows 为例),这个流程非常全面,里面的每一个字都很重要。
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "window functions"
[.getSideOutput(...)] <- optional: "output tag"
Flink 作为主要处理无界数据集的处理引擎,Windows 操作是一种处理的重要手段。核心思想是无界流通过划分到不同的桶(窗口)中,然后对每个桶中的元素进行聚合操作。根据是否对流使用 keyBy 操作,将WindowedStream
划分为 Keyed Windows 和 Non-keyed Windows。前者通过 key 划分数据到不同分区从而可以并行化执行。而后者则会将数据在一个分区上执行,亦即并行度是1。一般情况下我们都会使用前者。传统的分组操作是基于某个或某几个字段进行的,Windows 操作相当于引入了时间、数据量作为维度。
Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size by WindowAssigner, over which we can apply computations by window functions.
通过官网给出的流程我们可以看到:
window process = WindowAssigner(split and trigger) +window function(computations) + [optional setting]
接下来将对 Window 操作中必需和可选的流程分别介绍。
3.1 Required
3.1.1 WindowAssigner
WindowAssigner 的工作有两大块,split and trigger。即对数据流进行窗口划分以及触发窗口的计算。官网对 WindowAssigner 的划分比较粗略。这里给出我的理解:
WindowAssigner = [business adj] + WindowType + [timeType] + DefaultTrigger
即一个 WindowAssigner 通常由 4 部分组成,其中 WindowType 和 DefaultTrigger 是必选项。
- business adj:基于业务的形容词,如每隔多久/多少个数据进行计算,那么该形容词就是典型的 tumbling 即滚动窗口,类似的还有 sliding、session。
- WindowType:flink 中内置的有两种窗口,基于时间的 TimeWindow,不基于时间的 GlobalWindow。注意 GlobalWindow 不要和 AllWindowedStream 混淆了。AllWindowedStream 是在流的层面,强调的是所有数据都会传递到一个算子实例上进行计算,会丧失并行度。与之对应的是 WindowStream,这种流可以按照 key 进行并行执行,也是我们最常用的 window 流。而 GlobalWindow 对应的是 TimeWindow,是在窗口划分层面,划分窗口基于的流可以是 AllWindowedStream 也可以是 WindowStream。GlobalWindow 在整个窗口流中只会有一个窗口。而 TimeWindow 则会根据窗口的生命周期,在窗口流中存在多个窗口。
- timeType: 即时间语义。有 Processing Time、Event Time
- DefaultTrigger: 默认触发器。常见的系统内置的有 Processing/EventTimeTrigger,NeverTrigger 等
Built-in WindowAssigner:
根据前面总结的 WindowAssigner 的四大组成部分,我们来看一下系统内置的几种 WindowAssigner 是怎样构成的。
- SlidingProcessingTimeWindows: sliding + ProcessingTime + TimeWindow + ProcessingTimeTrigger
- GlobalWindows: no adj + no TimeType + GloablWindow + NeverTrigger (注意使用该 assigner 时需要自己指定 trigger,否则将永远无法触发计算)
Built-in interface:
除了上面例子中系统自带的 assigner 外,flink 还提供了一些封装了 assigner 的接口供我们使用
- timeWindow(Time size, Time slide): slinding + TimeWindow + Processing/Event TimeTrigger
- countWindow(long size) : tumbling + GlobalWindow + PurgingTrigger.of(CountTrigger)
可以看到,接口根据传入一个或两个参数决定窗口是 tumbling / sliding。值得注意的是 countWindow 接口中的 CountTrigger 外包裹了一层 PurgingTrigger,这里按下不表,后面进行分析。
3.1.2 Window Functions
前面划分窗口并使用 trigger 进行触发后,就来到了计算环节。flink 中的 window functions 分为两大类,增量聚合以及全量聚合。
增量聚合:
增量聚合即每来一个元素都会调用增量聚合函数进行聚合,并保存聚合状态。trigger 触发后,将聚合结果返回。
- ReduceFunction,AggregateFunction,FoldFunction(deprecated,use aggregate instead)
全量聚合:
每来一个元素都会缓存起来,trigger 触发后将所有缓存的元素返回调用全量聚合函数。
- WindowFunction(will be deprecated,use ProcessWindowFunction instead ),ProcessWindowFunction
API:
- 增量聚合
- reduce(ReduceFunction)
- aggreate(AggregateFunction)
- 全量聚合
- process(ProcessWindowFunction)
- 先预聚合,再将预聚合的结果传给全量聚合(如果单独使用全量聚合,ProcessWindowFunction.apply 函数的 input 是窗口的全量数据;而结合预聚合使用后,input 是预聚合的结果,虽然 input 是 Iterable 类型,但此时通常只有一个元素。这种使用场景通常是需要在保证时效的前提(使用预聚合)下另外得到窗口信息)
- reduce(ReduceFunction,ProcessWindowFunction)
- aggreate(AggregateFunction,ProcessWindowFunction)
3.2 Optional
3.1 中介绍的 WindowAssigner 和 window functions 都是 window 操作中的必选项,接下来介绍 window 操作中的可选项
3.2.1 Triggers
Trigger 即触发器是用来决定一个窗口何时进行计算的。系统内置的 Trigger 有 ProcessingTimeTrigger、EventTimeTrigger 等,我们也可以自定义一个 Trigger 来实现自己的逻辑。一般我们需要关注三个方法即可:
- onElement()
- onEventTime()
- onProcessingTime()
方法名已经很好的解释了回掉的时机,如果触发器和时间相关即使用的是 TimeWindow,需要根据时间语义关注后两个方法。如果使用的是 GlobalWindow 即触发器和时间无关,则需要重点关注第一个方法。三个方法的返回值都是一个枚举类型 TriggerResult。分别有 CONTINUE、FIRE_AND_PURGE、FIRE、PURGE。分别代表什么都不做、触发计算并清空数据、只触发计算、只清空数据。我们以 ProcessingTimeTrigger 为例:
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
可以看到 onProcessingTime 的回调是在 onElement 方法中通过 ctx.registerProcessingTimeTimer 注册的。并且在processing time 时间到达注册的时间点(窗口的最大值)时,进行回调并返回 FIRE 触发计算。
3.2.2 Evictors
Evictors 可以实现在 trigger 触发计算后,对窗口中的元素进行清除。可以在调用窗口函数前,也可以在调用窗口函数后。一般 Evictors 都是结合 GlobalWindow 使用。这里以 KeyedStream 的 countWindow(long size, long slide) 为例对 evictos 进行介绍。首先看一下该函数的使用场景,即创建根据数量的滑动窗口。如每 2 个元素对前 4 个元素进行计算。则 调用的是 countWindow(4,2)。再来看一下函数的实现:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
可以看到这里使用了 evictor 和 trigger。当使用 GlobalWindow 以数量为维度进行窗口划分时,trigger 一般使用 CountTrigger,表示没隔多少个元素就要触发计算。evictor 一般使用 CountEvictor,表示这个触发计算的窗口中需要计算的元素数量。下面这张图很好的描述了整个工作流程:
目前我们已经接触了两种清理窗口内数据的方法。第一种是 Trigger 中的 PURGE。第二种是通过 Evictor。
他们的相同点是一般都是基于 GlobalWindow 使用,且通过用户实现接口(Trigger、Evictor接口)来主动控制,并且只清理窗口内的元素但是不清理窗口本身。也就是说后续如果有元素仍然是可以进入到该窗口的,毕竟 GlobalWindow 只有一个窗口。
他们的不同点是 Evictor 是发生在 Trigger 之后的。
其实还有另外一种清理机制,即基于 TimeWindow 的清理机制,这种机制和前两种的不同是由 flink 控制进行清理,清理的时间点是时间到达 window 的最大时间戳 + 用户定义的 Allowed Lateness,并且除了清理窗口的数据外,还清理窗口本身(窗口元数据)。
现在我们来回顾下之前按下不表的 countWindow,首先看下代码:
countWindow(long size) : tumbling + GlobalWindow + PurgingTrigger.of(CountTrigger)
主要的疑问在于为什么在 CountTrigger 外包裹了一层 PurgingTrigger。先来了解一下 PurgingTrigger,他的实现很简单,就是将包裹在内层的 Trigger 在接口回调时返回的 TriggerResult 做一个判断,如果是 FIRE 则将其改为 FIRE_AND_PURGE。那么现在的问题就是为什么这里要对窗口中的元素进行清理呢?这里很容易理解跑偏,认为之所以清理数据是因为这个窗口是 Tumbling 窗口,所以可以对上一个窗口的数据进行清理。其实这么理解是错的,这里的核心在于 GlobalWindow,全局只有一个窗口(参考 Evictor 章节的流程图),如果不清理上一次 trigger 触发的数据,那么这些无用的数据将会在下一次 trigger 触发时出现,影响计算结果。而基于 TimeWindow 的滚动窗口,不需要进行数据清理。这是因为下一次 Trigger 时会新产生一个窗口之前窗口的数据不会带进来影响计算;并且无需担心之前窗口的数据一直被保留,因为基于 TimeWindow 的数据和窗口是由 flink 来管理的。
3.2.3 Allowed Lateness
Allowed Latness 是一种延长窗口生命周期的机制。使用最大乱序程度的方式指定 watermark 是在结果准确性和实效性间的权衡。而 Allowed Latness 则是对这种权衡的一种补充。旨在保证时效性的前提下(尽可能贴近理想的时间对窗口进行触发),对计算结果的准确性进一步增强。这种机制仅作用于 TimeWindow + event-time 的使用场景。
默认的 allowed lateness 值为 0,也就是说在 watermark 以后到达的数据将会被丢弃。 如果指定了 lateness 后,在 watermark 未超过 windowEnd + latness 时间前到达的数据仍然会被添加到窗口中。依赖于 trigger 的实现,这些没有被丢弃的数据仍然可能会触发窗口计算。我们以 EventTimeTrigger 为例:
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
可以看到触发 FIRE 的操作有两个地方,一个就是常规的在 onEventTime() 中触发。另外在 onElement() 中如果满足 window.maxTimestamp() <= ctx.getCurrentWatermark()
条件时,也会触发 FIRE。这里就是设置了 Allowed Latness 的情形。之前 watermark 到达了 window.maxTimestamp() 从而在 onEventTime() 中第一次触发 FIRE。但是因为 watermark 还没有到达 window.maxTimestamp() + lateness,所以此时窗口还没有被 remove 掉,此时满足window.maxTimestamp() <= ctx.getCurrentWatermark()
,所以将再次触发 FIRE。
3.2.4 sideOutputLateData
通过 flink 的侧输出流机制,可以将迟到数据发送到侧输出流。所谓迟到数据即在 watermark 超过 windowEnd + latness 以后到达的数据。使用起来很简单:
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
4. Windows demo
在 watermark 设置小节中提到了需要设置最大乱序值,那么这个值如何设定的?这里给出一种较为常见的生产实践:
小延迟处理大部分乱序 + window.allowedLateness + sideOutput
首先看一下输入数据的格式
sensor_1,1547718199,35.8
第一个字段是传感器 id,第二个字段是时间戳,第三个字段是温度。
object WindowTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val textInputStream: DataStream[String] = env.socketTextStream("localhost", 7777)
val sensorStream: DataStream[SensorReading] = textInputStream.map(line => {
val attrs: Array[String] = line.split(",")
SensorReading(attrs(0), attrs(1).toLong, attrs(2).toDouble)
})
val lateTag = new OutputTag[SensorReading]("late_tag")
val resultStream: DataStream[SensorReading] = sensorStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
.keyBy(_.id)
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateTag)
.reduce((currState, newData) => {
SensorReading(currState.id, currState.timestamp.max(newData.timestamp), currState.temperature.min(newData.temperature))
})
resultStream.print("result")
resultStream.getSideOutput(lateTag).print("late")
env.execute("Window Test")
}
}
最大乱序时间:3s,窗口允许延迟:1min,窗口大小:15s。程序的对窗口数据的聚合逻辑是输出每个 sensor_id 对应的最小的温度值,以及最新的时间戳。
首先看下窗口的起始点是如何确定的:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
一般情况下 offset 都是 0.下面我们输入第一条数据:
sensor_1,1547718199,35.8
通过计算得到窗口的起始点是 1547718195,这里简写为 195,窗口大小 15,所以第一个窗口的范围是 [195,210)。输入第一行数据后控制台没有任何输出,这是因为还没有触发窗口计算。算子的 event time 必须达到 210 才可以结算第一个[195,210) 的窗口。我们接下来输入第二条数据:
sensor_1,1547718210,33.8
仍然没有数据输出。这是因为算子的 event time 是由 watermark 决定的,我们 watermark 设置了最大乱序延迟 3s。所以当 event time 达到 213 的时候,才会关闭 [195,210] 的窗口,我们输入第三条数据:
sensor_1,1547718213,30.8
控制台得到输出:result> SensorReading(sensor_1,1547718199,35.8)
这里除了验证了 watermark 的计算方法和触发规则,还验证了窗口是一个左闭右开的区间。210 的数据并没有收录进第一个窗口,所以第一个窗口中现在只有一行数据。第二条数据则被分配到了第二个窗口[210,225)。那现在第一个窗口是否就已经完全关闭了呢?我们输入第四条数据:
sensor_1,1547718201,34.8
控制台得到输出:result> SensorReading(sensor_1,1547718201,34.8)
这是因为窗口虽然触发了计算,但是并没有关闭,因为我们还设置了窗口的延迟时间 1min。为了测试关闭该窗口,我们输入第五条数据:
sensor_1,15477273,24.8
控制台得到输出:result> SensorReading(sensor_1,1547718213,30.8)
首先说明 273 是如何计算的。为了关闭[195,210)的窗口,我们需要将算子的 event time 设置为 210 + 60(1 min) = 270。又因为有乱序延迟3s,所以最终的 event time 是 270 + 3 = 273。然后说明一下这个输出结果,因为当前算子的 event time 已经更新成 270 了,所以包含第二和第三条数据的窗口[210,225)触发了计算,得到了该结果。接下来我们测试第一个窗口是否已经关闭。我们输入第六条数据:
sensor_1,1547718202,14.8
控制台得到输出:late> SensorReading(sensor_1,1547718202,14.8)
可以看到窗口已经关闭,该条数据被侧输出流捕获。