flink的window和watermark的简单说明
2021-12-08 本文已影响0人
傻疯子
window
window窗口:通过将流人为的切割成一块一块进行统计
常用类型有Tumbling Windows和Sliding Windows,还有Session Windows以及可以自定义窗口。
Tumbling Windows是滚动窗口。根据时间或数量条件划分窗口大小,分成若干不重叠的块。
Sliding Windows是滑动窗口。根据时间或数量条件划分窗口大小以及每次移动的时间或数量。
windowAPI
滚动窗口
stream.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//窗口大小
//滚动时间窗口
.timeWindow(Time.seconds(10))
//滚动计数窗口
//.countWindow(5)
.sum(1)
滑动窗口
stream.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
//窗口大小和滑动间隔
//滑动时间窗口
.timeWindow(Time.seconds(10),Time.seconds(5))
//滑动计数窗口
//.countWindow(5,1)
.sum(1)
window是keyby后并行计算的窗口
windowAll是不对数据进行分组
// Keyed Window
stream
.keyBy(...) <- 按照一个Key进行分组
.window(...) <- 将数据流中的元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
// Non-Keyed Window
stream
.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
Trigger在获取之前就进行筛选
Evictor在获取之后进行筛选
window和windowAll中可以传以下继承WindowAssigner的自定义窗口
TumblingEventTimeWindows
TumblingProcessingTimeWindows
SlidingEventTimeWindows
SlidingProcessingTimeWindows
EventTimeSessionWindows
ProcessingTimeSessionWindows
watermark
watermark是给予窗口计算一个等待机制,因为数据有时候不是按顺序来,可能会迟到,所以希望能等到窗口内数据到齐后再进行计算
数据有一个产生EventTime,结合watermark进行使用
使用数据事件的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
产生水位线
env.getConfig.setAutoWatermarkInterval(200)
//规定允许乱序时间和提取时间戳
val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
element._2
}
})
然后再将waterMarkStream进行窗口函数的操作,此外可以使用sideOutputLateData保存乱序未按时到达的数据
//保存丢弃的数据
val Stream = waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.sideOutputLateData(outputTag)
.(窗口处理函数)
//获得被丢弃数据
val sideOutput = resStream.getSideOutput(outputTag)
此外还可以通过allowedLateness允许获取迟到的数据,当到达后重新计算
val Stream = waterMarkStream.keyBy(0)
.allowedLateness(Time.seconds(2))
.sideOutputLateData(outputTag)
.(窗口处理函数)
另外需要注意的是如果是在并行执行的情况下,窗口需要等所有线程都触发watermark才会执行