彻底搞清Flink中的Window

2021-04-11  本文已影响0人  大数据技术派

关注公众号:大数据技术派,回复 "资料",领取大数据资料,学习大数据技术。

flink-window

窗口

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

窗口的组成

窗口分配器

State

窗口函数

选择合适的计算函数,减少开发代码量提高系统性能

增量聚合函数(窗口只维护状态)

全量聚合函数(窗口维护窗口内的数据)

触发器

image-20210202200655485

触发器分类

CountTrigger

一旦窗口中的数据元数量超过给定限制,就会触发。所以其触发机制实现在onElement中

ProcessingTimeTrigger

基于处理时间的触发。

EventTimeTrigger

根据 watermarks 度量的事件时间进度进行触发。

PurgingTrigger
image-20210202200744793
DeltaTrigger
DeltaTrigger 的应用
image-20210202200802480

触发器原型

说明

窗口的分类

flink window图解

被Keys化Windows

可以理解为按照原始数据流中的某个key进行分类,拥有同一个key值的数据流将为进入同一个window,多个窗口并行的逻辑流

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

非被Keys化Windows

区别

Time-Based window(基于时间的窗口)

每一条记录来了以后会根据时间属性值采用不同的window assinger 方法分配给一个或者多个窗口,分为滚动窗口(Tumbling windows)和滑动窗口(Sliding windows)。

Tumbling windows(滚动窗口)

滚动窗口下窗口之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。

tumb-window

下面示例以滚动时间窗口(TumblingEventTimeWindows)为例,默认模式是TimeCharacteristic.ProcessingTime处理时间

/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

所以如果使用Event Time即数据的实际产生时间,需要通过senv.setStreamTimeCharacteristic指定

// 指定使用数据的实际时间
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<T> input = ...;

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 这里减去8小时,表示用UTC世界时间
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

Sliding windows(滑动窗口)

滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

slide-window

同理,如果是滑动时间窗口,也是类似的:

// 窗口的大小是10s,每5s滑动一次,也就是5s计算一次
.timeWindow(Time.seconds(10), Time.seconds(5))

这里使用的是timeWindow,通常使用window,那么两者的区别是什么呢?

timeWindow其实判断时间的处理模式是ProcessingTime还是SlidingEventTimeWindows,帮我们判断好了,调用方法直接传入(Time size, Time slide)这两个参数就好了,如果是使用.window方法,则需要自己来判断,就是前者写法更简单一些。

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
    if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
        return window(SlidingProcessingTimeWindows.of(size, slide));
    } else {
        return window(SlidingEventTimeWindows.of(size, slide));
    }
}

Count-Based window (基于计数的窗口)

Count Window 是根据元素个数对数据流进行分组的,也分滚动(tumb)和滑动(slide)。

Tumbling Count Window
当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。通过使用 DataStream API,我们可以这样实现:

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the buyCnt sum 
  .sum(1)

Sliding Count Window
当然Count Window 也支持 Sliding Window,虽在上图中未描述出来,但和Sliding Time Window含义是类似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例如下。

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)

会话(session)窗口

session-window
DataStream<T> input = ...;

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

Global Windows(全局窗口)

global-window

总结

SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows

延迟

默认情况下,当水印超过窗口末尾时,会删除延迟数据元。
但是,Flink允许为窗口 算子指定最大允许延迟。允许延迟指定数据元在被删除之前可以延迟多少时间,并且其默认值为0.
在水印通过窗口结束之后但在通过窗口结束加上允许的延迟之前到达的数据元,仍然添加到窗口中。
根据使用的触发器,延迟但未丢弃的数据元可能会导致窗口再次触发。就是这种情况EventTimeTrigger。

当指定允许的延迟大于0时,在水印通过窗口结束后保持窗口及其内容。在这些情况下,当迟到但未掉落的数据元到达时,它可能触发窗口的另一次触发。
这些射击被称为late firings,因为它们是由迟到事件触发的,与之相反的main firing 是窗口的第一次射击。在会话窗口的情况下,后期点火可以进一步导致窗口的合并,因为它们可以“桥接”两个预先存在的未合并窗口之间的间隙。
后期触发发出的数据元应该被视为先前计算的更新结果,即,您的数据流将包含同一计算的多个结果。根据您的应用程序,您需要考虑这些重复的结果或对其进行重复数据删除。

窗口的使用

Evictor

内置的Evitor

watermark

产生方式

在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

背景

解决的问题

多流waterMark

理解

watermark>=window_n_end_time && window_n_start_time<=vent_time<window_n_end_time(即数据属于这个窗口)

窗口聚合

常用的一些方法

AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。
简而言之,前一个接口将会周期性发送Watermark,而第二个接口根据一些到达数据的属性,例如一旦在流中碰到一个特殊的element便发送Watermark。

自定义窗口

window 出现数据倾斜

关注公众号:Java大数据与数据仓库,回复 "资料",领取大数据资料,学习大数据技术。

上一篇下一篇

猜你喜欢

热点阅读