window

2021-05-07  本文已影响0人  ZYvette

window 种类

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

问题总结

源码分析

windowAssinger: 窗口分配类型

image.png

window 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink提供了几种通用的 WindowAssigner:
tumbling window(窗口间的元素无重复),
sliding window(窗口间的元素可能重复),
session window 以及
global window.
如果需要自己定制数据分发策略,则可以实现一个 class,继承自WindowAssigner。

evictor:自定义用户代码前后操作

  [.evictor(...)]            <-  optional: "evictor" (else no evictor)

evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,默认是在用户代码之前执行。
更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter
两个方法。Flink 提供了如下三种通用的 evictor:

类别 种类
CountEvictor 保留指定数量的元素
DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删除一个元素。
TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值。

evictor 是可选的方法,如果用户不选择,则默认没有。


image.png

trigger:触发条件

  [.trigger(...)]            <-  optional: "trigger" (else default trigger)
image.png

trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,默认使用的是对应EventTimeTrigger、ProcessingTimeTrigger等等。


image.png

如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

allowedLateness:允许延迟时间

  [.allowedLateness(...)]    <-  optional: "lateness" (else zero)

Time & watermark

Event Time、Ingestion Time、Processing Time

Event-Time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion-Time 表示进入到系统的时间。

在 Flink 中我们可以通过下面的方式进行 Time 类型的设置

// 设置使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 

watermark


image.png

代码详解

apply方法:

private <R> WindowOperator<K, T, ?, R, W> apply(
            InternalWindowFunction<Iterable<T>, R, K, W> function) {
        if (evictor != null) {
            return buildEvictingWindowOperator(function);
        } else {
            ListStateDescriptor<T> stateDesc =
                    new ListStateDescriptor<>(
                            WINDOW_STATE_NAME, inputType.createSerializer(config));
            return buildWindowOperator(stateDesc, function);
        }
    }

session window原理

http://wuchong.me/blog/2016/06/06/flink-internals-session-window/

问题答案:

  1. 窗口什么时候出发?
    在符合trigger条件时触发。
    具体的看trigger部分
    例如:eventtime
    a.正常是窗口<=watermark是出发
    b.当配置允许数据延迟时,会在窗口<=watermark但是窗口+延迟时间<=watermark之前,没到一条数据会触发一次,之后将销毁。

2.延迟数据如何处理?
正常在窗口触发之后,窗口会被销毁,当允许数据延迟之后,窗口不会马上销毁,所以延迟的数据会被分配到该窗口,重新触发。

  1. 窗口如何分配?

sessionwindow:http://wuchong.me/blog/2016/06/06/flink-internals-session-window/
每条数据【start,start+gap】,求多数据的全集即是实际窗口大小。
slidingwindow:每条数据对应多个窗口
tumblingwindow:数据对应固定窗口

上一篇下一篇

猜你喜欢

热点阅读