Structured Streaming

2019-01-06  本文已影响86人  丹之

编程模型

Structured Streaming 的关键思想是将持续不断的数据当做一个不断追加的表。

基本概念

将输入的流数据当做一张 “输入表”。把每一条到达的数据作为输入表的新的一行来追加。



在输入表上执行的查询将会生成 “结果表”。每个触发间隔(trigger interval)(例如 1s),新的行追加到输入表,最终更新结果表。无论何时更新结果表,我们都希望将更改的结果行 output 到外部存储/接收器(external sink)。



output 有以下三种模式:

为了说明这个模型的使用,让我们来进一步理解上面的快速示例:

流式 DataFrames/Datasets 上的操作

可以在流式 DataFrames/Datasets 上应用各种操作:从无类型,类似 SQL 的操作(比如 select、where、groupBy),到类似有类型的 RDD 操作(比如 map、filter、flatMap)

基本操作 - Selection, Projection, Aggregation

大部分常见的 DataFrame/Dataset 操作也支持流式的 DataFrame/Dataset。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

event-time(事件时间)上的 window 操作

使用 Structured Streaming 进行滑动的 event-time 窗口聚合是很简单的,与分组聚合非常类似。在分组聚合中,为用户指定的分组列中的每个唯一值维护一个聚合值(例如计数)。在基于 window 的聚合的情况下,为每个 window 维护聚合(aggregate values),流式追加的行根据 event-time 落入相应的聚合。让我们通过下图来理解。

想象下,我们的快速示例现在改成了包含数据生成的时间。现在我们想在 10 分钟的 window 内计算 word count,每 5 分钟更新一次。比如 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 等。12:00 - 12:10 是指数据在 12:00 之后 12:10 之前到达。现在,考虑一个 word 在 12:07 的时候接收到。该 word 应当增加 12:00 - 12:10 和 12:05 - 12:15 相应的 counts。所以 counts 会被分组的 key 和 window 分组。

结果表将如下所示:



由于这里的 window 与 group 非常类似,在代码上,你可以使用 groupBy 和 window 来表达 window 聚合。例子如下:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Watermark 和延迟数据处理

现在考虑一个数据延迟到达会怎么样。例如,一个在 12:04 生成的 word 在 12:11 被接收到。application 会使用 12:04 而不是 12:11 去更新 12:00 - 12:10的 counts。这在基于 window 的分组中很常见。Structured Streaming 会长时间维持部分聚合的中间状态,以便于后期数据可以正确更新旧 window 的聚合,如下所示:



然后,当 query 运行了好几天,系统必须限制其累积的内存中中间状态的数量。这意味着系统需要知道什么时候可以从内存状态中删除旧的聚合,因为 application 不会再为该聚合更晚的数据进行聚合操作。为启动此功能,在Spark 2.1中,引入了 watermark(水印),使引擎自动跟踪数据中的当前事件时间,并相应地清理旧状态。你可以通过指定事件时间列来定义一个 query 的 watermark 和 late threshold(延迟时间阈值)。对于一个开始于 T 的 window,引擎会保持中间状态并允许后期的数据对该状态进行更新直到 max event time seen by the engine - late threshold > T。换句话说,在延迟时间阈值范围内的延迟数据会被聚合,但超过该阈值的数据会被丢弃。让我们以一个例子来理解这一点。我们可以使用 withWatermark() 定义一个 watermark,如下所示:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

在这个例子中,我们定义了基于 timestamp 列定义了 watermark,并且将 10 分钟定义为允许数据延迟的阈值。如果该数据以 update 输出模式运行:

某些 sink(例如文件)可能不支持 update mode 所需的细粒度更新。所以,我们还支持 append 模式,只有最后确定的计数被写入。这如下图所示。

注意,在非流式 Dataset 上使用 withWatermark 是无效的空操作。



与之前的 update mode 类似,引擎维护每个 window 的中间计数。只有当 window < watermark 时才会删除 window 的中间状态数据,并将该 window 最终的 counts 追加到结果表或 sink 中。例如,window 12:00 - 12:10 的最终结果将在 watermark 更新到 12:11 后再追加到结果表中。
watermark 清除聚合状态的条件十分重要,为了清理聚合状态,必须满足以下条件(自 Spark 2.1.1 起,将来可能会有变化):

Output Modes

有几种类型的输出模式:

Append mode(默认的):这是默认模式,其中只有从上次触发后添加到结果表的新行将被输出到 sink。适用于那些添加到结果表中的行从不会更改的查询。只有 select、where、map、flatMap、filter、join 等查询会支持 Append mode
Complete mode:每次 trigger 后,整个结果表将被输出到 sink。聚合查询(aggregation queries)支持该模式
Update mode:(自 Spark 2.1.1 可用)。只有结果表中自上次 trigger 后更新的行将被输出到 sink
不同类型的流式 query 支持不同的 output mode。以下是兼容性:


https://github.com/xy2953396112/spark-sourcecodes-analysis/blob/master/structured-streaming/Structured-Streaming-%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97.md

上一篇 下一篇

猜你喜欢

热点阅读