Flink

flink的window和watermark的简单说明

2021-12-08  本文已影响0人  傻疯子

window

window窗口:通过将流人为的切割成一块一块进行统计

常用类型有Tumbling Windows和Sliding Windows,还有Session Windows以及可以自定义窗口。

Tumbling Windows是滚动窗口。根据时间或数量条件划分窗口大小,分成若干不重叠的块。
Sliding Windows是滑动窗口。根据时间或数量条件划分窗口大小以及每次移动的时间或数量。

windowAPI

滚动窗口

stream.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(0)
      //窗口大小
      //滚动时间窗口
      .timeWindow(Time.seconds(10))
       //滚动计数窗口
       //.countWindow(5)
      .sum(1)

滑动窗口

   stream.flatMap(_.split(" "))
        .map((_,1))
        .keyBy(_._1)
        //窗口大小和滑动间隔
        //滑动时间窗口
        .timeWindow(Time.seconds(10),Time.seconds(5))
        //滑动计数窗口
        //.countWindow(5,1)
        .sum(1)

window是keyby后并行计算的窗口
windowAll是不对数据进行分组

// Keyed Window
stream
.keyBy(...) <- 按照一个Key进行分组
.window(...) <- 将数据流中的元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function

// Non-Keyed Window
stream
.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function

Trigger在获取之前就进行筛选
Evictor在获取之后进行筛选

window和windowAll中可以传以下继承WindowAssigner的自定义窗口
TumblingEventTimeWindows
TumblingProcessingTimeWindows
SlidingEventTimeWindows
SlidingProcessingTimeWindows
EventTimeSessionWindows
ProcessingTimeSessionWindows

watermark

watermark是给予窗口计算一个等待机制,因为数据有时候不是按顺序来,可能会迟到,所以希望能等到窗口内数据到齐后再进行计算
数据有一个产生EventTime,结合watermark进行使用

使用数据事件的时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

产生水位线

env.getConfig.setAutoWatermarkInterval(200)
//规定允许乱序时间和提取时间戳
val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
      .withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
  override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
           element._2
}
      })

然后再将waterMarkStream进行窗口函数的操作,此外可以使用sideOutputLateData保存乱序未按时到达的数据

//保存丢弃的数据
val Stream = waterMarkStream.keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .sideOutputLateData(outputTag)
      .(窗口处理函数)
//获得被丢弃数据
val sideOutput = resStream.getSideOutput(outputTag)

此外还可以通过allowedLateness允许获取迟到的数据,当到达后重新计算

val Stream = waterMarkStream.keyBy(0)
      .allowedLateness(Time.seconds(2))
      .sideOutputLateData(outputTag)
      .(窗口处理函数)

另外需要注意的是如果是在并行执行的情况下,窗口需要等所有线程都触发watermark才会执行

上一篇 下一篇

猜你喜欢

热点阅读