第六章.Flink Time 与 Window

2019-05-23  本文已影响0人  __元昊__

6.1 Time

在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示:

QQ截图20190523202337.png
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的
日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事
件时间戳。
Ingestion Time:是数据进入 Flink 的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器
相关,默认的时间属性就是 Processing Time。
例如,一条日志进入 Flink 的时间为 2017-11-12 10:00:00.123,到达 Window 的
系统时间为 2017-11-12 10:00:01.234,日志的内容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计 1min 内的故障日志个数,哪个时间是最有意义的?——
eventTime,因为我们要根据日志的生成时间进行统计。

6.2 Window

6.2.1 Window 概述

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限
数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据
为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大
小的”buckets”桶,我们可以在这些桶上做计算操作。

6.2.2 Window 类型

Window 可以分成两类:
 CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
 TimeWindow:按照时间生成 Window。
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling
Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

  1. 滚动窗口(Tumbling Windows)
    将数据依据固定的窗口长度对数据进行切片
    特点:时间对齐,窗口长度固定,没有重叠
    滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一
    个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗
    口,窗口的创建如下图所示:
    QQ截图20190523202616.png
    适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。
  2. 滑动窗口(Sliding Windows)
    滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动 间隔组成
    特点:时间对齐,窗口长度固定,有重叠
    滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大
    小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,
    滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素
    会被分配到多个窗口中。
    例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包
    含着上个 10 分钟产生的数据,如下图所示:
    QQ截图20190523202844.png
    适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定
    是否要报警)。
  3. 会话窗口(Session Windows)
    由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口
    特点:时间无对齐
    session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗
    口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它 在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关 闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃
    周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将
    被分配到新的 session 窗口中去。
    QQ截图20190523203035.png

6.3 Window API

6.3.1 CountWindow

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素 数量达到窗口大小的 key 对应的结果。 注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输 入的所有元素的总数
1 滚动窗口
默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量
达到窗口大小时,就会触发窗口的执行。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
// 引入滚动窗口
// 这里的 5 指的是 5 个相同 key 的元素计算一次
val streamWindow = streamKeyBy.countWindow(5)
// 执行聚合操作
val streamReduce = streamWindow.reduce(
 (item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
env.execute("TumblingWindow")

2 滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据
就计算一次,每一次计算的 window 范围是 5 个元素。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
// 引入滚动窗口
// 当相同 key 的元素个数达到 2 个时,触发窗口计算,计算的窗口范围为 5
val streamWindow = streamKeyBy.countWindow(5,2)
// 执行聚合操作
val streamReduce = streamWindow.reduce(
 (item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
env.execute("TumblingWindow")

6.3.2 TimeWindow

TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个
window 里面的所有数据进行计算。

  1. 滚动窗口
    Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的
    数据根据进入 Flink 的时间划分到不同的窗口中。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入时间窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作
val streamReduce = streamWindow.reduce(
 (item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
env.execute("TumblingWindow")

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其
中的一个来指定。

  1. 滑动窗口(SlidingEventTimeWindows)
    滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
    数,一个是 window_size,一个是 sliding_size。
    下面代码中的 sliding_size 设置为了 2s,也就是说,窗口每 2s 就计算一次,每
    一次计算的 window 范围是 5s 内的所有元素。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5), Time.seconds(2))
// 执行聚合操作
val streamReduce = streamWindow.reduce(
 (item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
env.execute("TumblingWindow")

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其
中的一个来指定。

6.3.3 Window Reduce

WindowedStream → DataStream:给 window 赋一个 reduce 功能的函数,并
返回一个聚合的结果。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入时间窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作
val streamReduce = streamWindow.reduce(
 (item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
env.execute("TumblingWindow")

6.3.4 Window Fold

WindowedStream → DataStream:给窗口赋一个 fold 功能的函数,并返回一
个 fold 后的结果。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111,'\n',3)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行 fold 操作
val streamFold = streamWindow.fold(100){
 (begin, item) =>
begin + item._2
}
// 将聚合数据写入文件
streamFold.print()
// 执行程序
env.execute("TumblingWindow")

6.3.5 Aggregation on Window

WindowedStream → DataStream:对一个 window 内的所有元素做聚合操作。
min 和 minBy 的区别是 min 返回的是最小值,而 minBy 返回的是包含最小值字段
的元素(同样的原理适用于 max 和 maxBy)。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作
val streamMax = streamWindow.max(1)
// 将聚合数据写入文件
streamMax.print()
// 执行程序
env.execute("TumblingWindow")
上一篇下一篇

猜你喜欢

热点阅读