window
window 种类
- keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
- Non-Keyed Windows:也就是global window
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
问题总结
- 窗口计算是如何触发的
- watermark的作用
- 延迟数据如何计算
源码分析
windowAssinger: 窗口分配类型
- window(<windowAssigner>)
window 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink提供了几种通用的 WindowAssigner:
tumbling window(窗口间的元素无重复),
sliding window(窗口间的元素可能重复),
session window 以及
global window.
如果需要自己定制数据分发策略,则可以实现一个 class,继承自WindowAssigner。
evictor:自定义用户代码前后操作
[.evictor(...)] <- optional: "evictor" (else no evictor)
evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,默认是在用户代码之前执行。
更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter
两个方法。Flink 提供了如下三种通用的 evictor:
类别 | 种类 |
---|---|
CountEvictor | 保留指定数量的元素 |
DeltaEvictor | 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删除一个元素。 |
TimeEvictor | 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值。 |
evictor 是可选的方法,如果用户不选择,则默认没有。
image.png
trigger:触发条件
[.trigger(...)] <- optional: "trigger" (else default trigger)
image.png
trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,默认使用的是对应EventTimeTrigger、ProcessingTimeTrigger等等。
image.png
如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:
- onElement() 每次往 window 增加一个元素的时候都会触发
- onEventTime() 当 event-time timer 被触发的时候会调用
- onProcessingTime() 当 processing-time timer 被触发的时候会调用
- onMerge() 对两个 trigger 的 state 进行 merge 操作
- clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:
- CONTINUE 不做任何事情
- FIRE 触发 window
- PURGE 清空整个 window 的元素并销毁窗口
- FIRE_AND_PURGE 触发窗口,然后销毁窗口
参考:https://blog.csdn.net/weixin_44904816/article/details/106346039
allowedLateness:允许延迟时间
[.allowedLateness(...)] <- optional: "lateness" (else zero)
Time & watermark
Event Time、Ingestion Time、Processing TimeEvent-Time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion-Time 表示进入到系统的时间。
在 Flink 中我们可以通过下面的方式进行 Time 类型的设置
// 设置使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
watermark
image.png
- watermark是指定时生成一个时间戳,用于标杆当前数据是否延迟,任务watermark的时间点任务之后的数据都是晚到的。
- 因为数据会有延迟,watermark不能完全解决延迟问题,所以实际中可以设置允许延迟,并触发延迟数据处理。
- watermark是用于处理EventTime的数据。
代码详解
apply方法:
private <R> WindowOperator<K, T, ?, R, W> apply(
InternalWindowFunction<Iterable<T>, R, K, W> function) {
if (evictor != null) {
return buildEvictingWindowOperator(function);
} else {
ListStateDescriptor<T> stateDesc =
new ListStateDescriptor<>(
WINDOW_STATE_NAME, inputType.createSerializer(config));
return buildWindowOperator(stateDesc, function);
}
}
session window原理
http://wuchong.me/blog/2016/06/06/flink-internals-session-window/
问题答案:
- 窗口什么时候出发?
在符合trigger条件时触发。
具体的看trigger部分
例如:eventtime
a.正常是窗口<=watermark是出发
b.当配置允许数据延迟时,会在窗口<=watermark但是窗口+延迟时间<=watermark之前,没到一条数据会触发一次,之后将销毁。
2.延迟数据如何处理?
正常在窗口触发之后,窗口会被销毁,当允许数据延迟之后,窗口不会马上销毁,所以延迟的数据会被分配到该窗口,重新触发。
- 窗口如何分配?
sessionwindow:http://wuchong.me/blog/2016/06/06/flink-internals-session-window/
每条数据【start,start+gap】,求多数据的全集即是实际窗口大小。
slidingwindow:每条数据对应多个窗口
tumblingwindow:数据对应固定窗口