Spark

StructuredStreaming编程指南

2019-01-23  本文已影响22人  撸码小丑

1、概述

结构化流是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。您可以用在静态数据上表示批处理计算的方式来表示流计算。Spark SQL引擎将负责递增和连续地运行它,并在流数据继续到达时更新最终结果。可以使用Scala、Java、Python或R中的DataSet/DataFrAPI API来表示流聚合、事件时间窗口、流到批连接等。在相同的优化Sql SQL引擎上执行计算。最后,系统通过检查点和提前写入日志来确保端到端的容错性。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑流。
在内部,默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现端到端延迟,最短可达100毫秒,并且可以保证容错性。然而,自Spark 2.3以来,我们引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至1毫秒的端到端延迟。在查询中不更改数据集/数据帧操作的情况下,您可以根据应用程序要求选择模式。
在本文中,将介绍编程模型和API。将主要使用默认的微批量处理模型来解释这些概念,然后讨论连续处理模型。首先,从一个简单的结构化流式查询示例开始——流式WordCount。

2、快速入门

假设你希望维护从侦听TCP Socket的数据服务器接收的文本数据的word count。让我们看看如何使用structured streaming来表达这一点。首先,我们必须导入必要的类并创建本地SparkSession,这是与Spark相关的所有功能的起点。

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();

接下来,让我们创建一个流式DataFrame,它表示从侦听localhost:9999的服务器接收到的文本数据,并转换该DataFrame以计算单词字数。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

此行数据帧表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流式文本数据中的每一行将成为表中的一行。注意,由于我们只是在设置转换,所以目前还没有接收到任何数据,而且还没有启动转换。接下来,我们使用.as(encoders.string())将数据帧转换为字符串数据集,这样我们就可以应用flatmap操作将每一行拆分为多个单词。结果单词数据集包含所有单词。最后,我们通过对数据集中的唯一值进行groupby并对其进行计数来定义WordCounts数据帧。注意,这是一个流数据帧,它表示流的运行单词计数。

我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置为每次更新时向控制台打印完整的计数集(由outputmode("complete")指定)。然后使用start()启动流计算。

// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();

执行此代码后,流计算将在后台启动。查询对象是该活动流查询的句柄,我们决定使用waitTermination()等待查询的终止,以防止查询活动时进程退出。

要实际执行这个示例代码,首先需要使用

$ nc -lk 9999

然后,在另一个终端中,可以使用

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

然后,在运行netcat服务器的终端中键入的任何行都将被计数并每秒在屏幕上打印。像下面这样:

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop


# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

3、编程模型

结构化流中的关键思想是将实时数据流视为一个不断追加的表。这导致了一个新的流处理模型,与批处理模型非常相似。您将把流计算表示为与静态表类似的标准批处理查询,spark将在无边界输入表上以增量查询的形式运行它。让我们更详细地了解这个模型。

3.1、基本概念

想象把流数据当成一个'Input Table',每个data item到来后都会追加到这个table里面。


image.png

对输入的查询将生成“结果表”。每一个触发间隔(比如说,每1秒),新的行都会附加到输入表中,这最终会更新结果表。每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

image.png

“输出”定义为写入外部存储器的内容。可以在不同的模式下定义输出:

请注意,结构化流并没有具体化整个表。它从流数据源中读取最新的可用数据,然后递增处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如前面示例中的中间计数)。

此模型与许多其他流处理引擎显著不同。许多流系统要求用户自己维护正在运行的聚合,因此必须考虑容错性和数据一致性(至少一次、最多一次或完全一次)。在这个模型中,spark负责在有新数据时更新结果表,从而减少用户对结果表的推理。作为一个例子,让我们看看这个模型如何处理基于事件时间的处理和延迟到达的数据。

3.2、处理事件时间和延迟数据

事件时间是嵌入到数据本身中的时间。对于许多应用程序,您可能希望在此事件时间上进行操作。例如,如果您希望获得每分钟由物联网设备生成的事件数,那么您可能希望使用生成数据的时间(即数据中的事件时间),而不是Spark接收数据的时间。这个事件时间很自然地用这个模型表示——设备中的每个事件都是表中的一行,而事件时间是行中的一列值。这允许基于窗口的聚合(例如,每分钟事件数)只是事件时间列上特殊类型的分组和聚合-每个时间窗口都是一个组,并且每一行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询既可以在静态数据集(例如,从收集的设备事件日志中)上定义,也可以在数据流上定义,从而使用户的生活更加方便。

此外,该模型根据事件时间自然地处理比预期晚到达的数据。由于Spark正在更新结果表,因此它可以完全控制在有延迟数据时更新旧聚合,以及清除旧聚合以限制中间状态数据的大小。自Spark2.1以来,我们支持做标记,允许用户指定最新数据的阈值,并允许引擎相应地清除旧状态。稍后将在窗口操作部分中更详细地解释这些内容。

3.3、容错语义

只交付一次端到端语义是结构化流设计背后的关键目标之一。为了实现这一点,spark设计了结构化的流媒体源、接收器和执行引擎,以便可靠地跟踪处理的确切进度,以便通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于Kafka偏移量或Kinesis序列号),以跟踪流中的读取位置。引擎使用检查点和提前写入日志来记录每个触发器中正在处理的数据的偏移范围。流水槽设计成等量处理后处理。同时,使用可重放源和等量汇点,结构化流可以确保在任何故障下端到端的语义都是一次性的。

4、使用DataSet和DataFrames的API

由于Spark 2.0,DataSet和DataFrame可以表示静态的有界数据,也可以表示流式的无界数据。与静态DataSet和DataFrame类似,你可以使用公共入口点SparkSession从流源创建流DataSets和DataFrames,并将它们作为静态DataSet和DataFrame应用于相同的操作。

4.1、创建流式DataSets和DataFrames

Streaming DataFrames可以通过DataStreamReader接口创建。与创建静态DataFrame的读取接口类似,您可以指定源的详细信息—数据格式、模式、选项等。

有些源不具有容错性,因为它们不能保证在失败后可以使用检查点偏移量重播数据。请参见前面关于容错语义的部分。以下是Spark中所有来源的详细信息。


image.png

下面是一些例子。

SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")

这些示例生成未类型化的流式DataFrame,这意味着在编译时不检查DataFrame的架构,只在提交查询时在运行时检查。一些操作(如map、flatmap等)需要在编译时知道类型。为此,可以使用与静态数据帧相同的方法将这些非类型化流数据帧转换为类型化流数据集。有关详细信息,请参阅《SQL编程指南》。此外,有关支持的流媒体源的更多详细信息将在文档的后面讨论。

4.2、流式DataFrames/Datasets的模式推断和划分

您可以对流式DataFrames/Datasets应用各种操作—从非类型化、类似SQL的操作(例如select、where、groupby)到类似RDD的类型化操作(例如map、filter、flatmap)。有关详细信息,请参阅《SQL编程指南》。让我们来看几个您可以使用的示例操作。

import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));

还可以将流式数据帧/数据集注册为临时视图,然后对其应用SQL命令。

df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF

注意,可以使用df.isStreaming来标识数据帧/数据集是否具有流数据。

df.isStreaming()
4.3、Window Operations on Event Time

滑动事件时间窗口上的聚合对于结构化流非常简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。对于基于窗口的聚合,将为行的事件时间所在的每个窗口维护聚合值。让我们用一个例子来理解这一点。

假设我们的快速示例被修改了,流现在包含了行以及生成行的时间。我们不需要运行单词计数,而是希望在10分钟的窗口内对单词进行计数,每5分钟更新一次。也就是说,单词在10分钟窗口12:00-12:10、12:05-12:15、12:10-12:20等时间段内接收的单词中计数。请注意,12:00-12:10表示12:00之后但12:10之前到达的数据。现在,考虑一下12:07收到的一个词。这个词应该增加对应于两个窗口12:00-12:10和12:05-12:15的计数。因此,计数将由分组键(即字)和窗口(可以从事件时间计算)这两个参数索引。

结果表如下所示。


image.png

由于此窗口化与分组类似,因此在代码中,可以使用groupby()和window()操作来表示窗口化聚合。

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();

在本例中,我们定义了查询的水印“timestamp”列的值,还定义了“10分钟”作为允许数据延迟的阈值。如果在更新输出模式下运行此查询(稍后在输出模式部分中讨论),则引擎将继续更新结果表中窗口的计数,直到窗口比水印旧,而水印比“timestamp”列中的当前事件时间落后10分钟。这是一个例子。


image.png

如图所示,引擎跟踪的最大事件时间是蓝色虚线,每个触发器开始时设置为(最大事件时间-“10分钟”)的水印是红线。例如,当引擎观察数据(12:14,dog)时,它将下一个触发器的水印设置为12:04。这个水印允许引擎在额外的10分钟内保持中间状态,以便计算延迟的数据。例如,数据(12:09,cat)出现故障和延迟,并落在Windows 12:00-12:10和12:05-12:15中。由于它仍在触发器中的水印12:04之前,因此引擎仍将中间计数保持为状态,并正确更新相关窗口的计数。但是,当水印更新到12:11时,窗口的中间状态(12:00-12:10)被清除,所有后续数据(例如(12:04,驴))被视为“太晚”,因此被忽略。请注意,在每个触发器之后,更新的计数(即紫色行)都会写入sink作为触发器输出,这由更新模式决定。

某些接收器(如文件)可能不支持更新模式所需的细粒度更新。为了使用它们,我们还支持附加模式,其中只有最终计数被写入sink。如下所示。
请注意,在非流式数据集中使用withwatermark是不起作用的。由于水印不应以任何方式影响任何批查询,因此我们将直接忽略它。


image.png

与之前的更新模式类似,引擎为每个窗口保持中间计数。但是,部分计数不会更新到结果表,也不会写入接收器。引擎等待“10分钟”计算延迟日期,然后将窗口的中间状态<水印,并将最终计数附加到结果表/接收器。例如,只有在水印更新为12:11之后,才会将窗口12:00-12:10的最终计数追加到结果表中。

4.4、Join Operations

结构化流支持将流数据集/数据帧与静态数据集/数据帧以及另一个流数据集/数据帧连接起来。流连接的结果是递增生成的,类似于上一节中的流聚合结果。在本节中,我们将探讨在上述情况下支持的连接类型(即内部、外部等)。请注意,在所有支持的联接类型中,使用流数据集/数据帧进行联接的结果将与使用流中包含相同数据的静态数据集/数据帧时的结果完全相同。

Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join");  // right outer join with a static DF

注意,流静态连接不是有状态的,因此不需要状态管理。但是,还不支持几种类型的流静态外部联接。这些类型在后面会有详细的介绍。

让我们用一个例子来理解这一点。

假设我们想将一个广告印象流(显示广告时)与另一个用户点击广告流连接起来,以便在印象导致可货币化点击时进行关联。要允许此流连接中的状态清理,您必须指定水印延迟和时间约束,如下所示。
1.水印延迟:例如,事件时间中的印痕和相应的点击可能延迟/无序,分别最多2小时和3小时。
2.事件时间范围条件:例如,在相应的印象后0秒到1小时的时间范围内可能会发生一次单击。

代码应该是这样的。

import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);

带水印的流内部连接的语义保证
这类似于在聚合上添加水印提供的保证。水印延迟“2小时”保证引擎不会丢弃任何延迟时间小于2小时的数据。但延迟超过2小时的数据可能会被处理,也可能不会被处理。

Outer Joins with Watermarking
对于内部联接,水印+事件时间约束是可选的,而对于左侧和右侧外部联接,则必须指定它们。这是因为为了在外部联接中生成空结果,引擎必须知道将来何时输入行将与任何内容不匹配。因此,必须指定水印+事件时间约束以生成正确的结果。因此,具有外部联接的查询看起来很像前面的广告货币化示例,只是有一个额外的参数将其指定为外部联接。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter"
);

带水印的流外部连接的语义保证
外部联接与内部联接在水印延迟以及是否删除数据方面具有相同的保证。
告诫
关于外部结果是如何产生的,有几个重要的特征需要注意:
1.外部空结果将根据指定的水印延迟和时间范围条件生成延迟。这是因为引擎必须等待那么长的时间,以确保没有匹配,将来也不会有更多的匹配。

2.在微批量引擎的当前实现中,水印是在微批量结束时进行的,下一个微批量使用更新后的水印来清除状态并输出外部结果。由于我们只在需要处理新数据时触发一个微批处理,因此如果流中没有接收到新数据,则外部结果的生成可能会延迟。简而言之,如果被联接的两个输入流中的任何一个在一段时间内没有接收数据,则外部(两种情况下,左或右)输出可能会延迟。

流式查询中联接的支持列表

有关支持的联接的其他详细信息

4.5、流式重复数据消除

您可以使用事件中的唯一标识符来消除数据流中的重复记录。这与使用唯一标识符列的静态重复数据消除完全相同。查询将存储以前记录中所需的数据量,以便筛选重复记录。与聚合类似,您可以使用带或不带水印的重复数据消除。

Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
4.6、处理多个水印的策略

流查询可以有多个联合或连接在一起的输入流。每个输入流可以有一个不同的延迟数据阈值,对于有状态的操作,这些阈值需要被容忍。在每个输入流上使用withWatermarks("eventtime",delay)指定这些阈值。例如,考虑使用inputstream1和inputstream2之间的流连接进行查询。

inputStream1.withWatermark(“eventTime1”, “1 hour”) .join( inputStream2.withWatermark(“eventTime2”, “2 hours”), joinCondition)

在执行查询时,结构化流单独跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择一个带有它们的全局水印用于状态操作。默认情况下,选择最小值作为全局水印,因为这样可以确保如果其中一个流落后于另一个流(例如,其中一个流由于上游故障而停止接收数据),则不会意外地将任何数据拖得太晚。换句话说,全局水印将以最慢流的速度安全移动,查询输出将相应延迟。
但是,在某些情况下,您可能希望获得更快的结果,即使这意味着从最慢的流中删除数据。由于Spark 2.4,可以通过将SQL配置spark.sql.streaming.multipleWatermarkPolicy to max (default is min),将多水印策略设置为选择最大值作为全局水印。这使全局水印以最快的流速度移动。但是,作为一个副作用,来自较慢流的数据将被大量丢弃。因此,明智地使用这个配置。

4.7、任意状态操作

许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须从事件的数据流中跟踪会话。为了进行这种会话化,必须将任意类型的数据保存为状态,并使用每个触发器中的数据流事件对状态执行任意操作。由于spark 2.2,可以使用操作mapGroupsWithState 和更强大的操作flatMapGroupsWithState来完成此操作。这两个操作都允许您对分组数据集应用用户定义的代码以更新用户定义的状态。

4.8、不支持的操作

流式数据帧/数据集不支持一些数据帧/数据集操作。其中一些如下。

此外,还有一些数据集方法不适用于流数据集。它们是将立即运行查询并返回结果的操作,这对流数据集没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参见下一节)。

如果您尝试这些操作中的任何一个,您将看到类似“流式数据帧/数据集不支持操作xyz”的分析异常。虽然其中一些可能在未来的Spark版本中得到支持,但还有一些基本上难以有效地在流数据上实现。例如,不支持对输入流进行排序,因为它需要跟踪流中接收的所有数据。因此,从根本上说,这很难有效地执行。

5、开始流式查询

一旦定义了最终结果数据帧/数据集,剩下的就是开始流计算。要做到这一点,您必须使用DataStreamWriter ,通过 Dataset.writeStream(),您必须在此接口中指定以下一个或多个选项。

5.1、Output Modes输出模式

有几种类型的输出模式。

不同类型的流式查询支持不同的输出模式。这是兼容性矩阵。


5.2、Output Sinks

有几种类型的内置输出接收器。

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
writeStream
    .foreach(...)
    .start()
writeStream
    .format("console")
    .start()
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

有些接收器不能容错,因为它们不能保证输出的持久性,并且仅用于调试目的。请参见前面关于容错语义的部分。以下是spark中所有水槽sinks的细节。



注意,必须调用start()才能实际开始执行查询。这将返回一个streamingquery对象,该对象是连续运行执行的句柄。您可以使用这个对象来管理查询,我们将在下一小节中讨论这个问题。现在,让我们用几个例子来理解这一切。

// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table

使用foreach和foreachbatch
foreach和foreachbatch操作允许您对流式查询的输出应用任意操作和写入逻辑。它们有稍微不同的用例——虽然foreach允许在每一行上自定义写入逻辑,但是foreach batch允许在每个微批的输出上执行任意操作和自定义逻辑。让我们更详细地了解它们的用法。
ForeachBatch
foreachbatch(…)允许您指定对流式查询的每个微批的输出数据执行的函数。自Spark 2.4以来,这在scala、Java和Python中得到了支持。它需要两个参数:一个数据帧或数据集,该数据帧或数据集具有微批的输出数据和微批的唯一ID。

streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long> {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }    
  }
).start();

使用foreachbatch,可以执行以下操作。

streamingdf.writestream.forachbatch{(batchdf:dataframe,batchid:long)=>
batchdf.persist()
batchdf.write.format(…)/位置1 
batchdf.write.format(…)/位置2 
batchdf.unpersist()
}

注:

Foreach
如果foreach batch不是一个选项(例如,相应的批数据编写器不存在,或者不存在连续处理模式),则可以使用foreach表示自定义编写器逻辑。具体来说,您可以将数据写入逻辑划分为三种方法:open, process, and close. 。自从Scale 2.4以来,Foreach在Scala、Java和Python中可用。
In Java, you have to extend the class ForeachWriter

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter[String] {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

执行语义:当启动流式查询时,spark以以下方式调用函数或对象的方法:

5.3、触发器

流式查询的触发器设置定义了流式数据处理的时间,无论该查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。以下是支持的不同类型的触发器。



下面是一些实例

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start();

6、管理流式查询

启动查询时创建的StreamingQuery可用于监视和管理查询。

StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query

您可以在单个SparkSession中启动任意数量的查询。它们都将同时运行,共享集群资源。您可以使用sparkSession.streams()来获得用于管理当前活动查询的StreamingQueryManager

SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates

7、监控流式查询

有多种方法可以监视活动的流式查询。您可以使用Spark的Dropwizard度量支持将度量推送到外部系统,也可以通过编程方式访问它们。

7.1、交互读取度量值

您可以使用streamingQuery.lastProgress() and streamingQuery.status().lastProgress()直接获取活动查询的当前状态和度量值。lastprogress()返回StreamingQueryProgress in scala and java和一个在python中具有相同字段的字典。它包含有关流最后一个触发器中的进度的所有信息—处理了哪些数据、处理速率、延迟等。还有streamingQuery.recentProgress,它返回最后几个进度的数组。

另外,streamingQuery.status()返回StreamingQueryStatus inscala and java和一个在python中具有相同字段的字典。它提供有关查询正在立即执行的操作的信息—触发器是否处于活动状态、数据是否正在处理等。

下面是几个例子。

StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
7.2、使用异步API以编程方式报告度量

还可以通过附加streamingQueryListener(scala/java docs)异步监视与sparkSession关联的所有查询。一旦使用SparkSession.streams.addListener()附加了customStreamingQueryListener对象,在启动和停止查询以及在活动查询中取得进展时,您将得到回调。下面是一个例子,

SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
7.3、使用DropWizard报告度量值

Spark支持使用DropWizard库报告度量。要同时报告结构化流式查询的指标,必须在SparkSession中显式启用configurationsPark.sql.streaming.metricsEnabled。

spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");

启用此配置后在SparkSession中启动的所有查询都将通过DropWizard向配置的Versinkshave报告度量(例如Ganglia、Graphite、JMX等)。

8、使用检查点从失败中恢复

如果出现故障或有意关闭,您可以恢复以前查询的进度和状态,并在停止的地方继续。这是使用检查点和提前写入日志完成的。您可以使用检查点位置配置查询,查询将把所有进度信息(即每个触发器中处理的偏移范围)和正在运行的聚合(如快速示例中的字数)保存到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时在DatastreamWriter中设置为选项。

aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();

9、流式查询更改后的恢复语义

在从同一检查点位置重新启动之间,流式查询中允许哪些更改存在限制。以下是一些不允许的更改,或者更改的效果没有很好的定义。对于所有人:

变更的类型

10、连续的工作

[实验]
连续处理是Spark 2.3中引入的一种新的、实验性的流式执行模式,它允许低(~1 ms)端到端延迟,并至少保证一次容错。将其与默认的微批量处理引擎进行比较,后者可以实现一次完全保证,但最多只能实现约100毫秒的延迟。对于某些类型的查询(在下面讨论),您可以选择在不修改应用程序逻辑的情况下执行它们的模式(即,不更改数据帧/数据集操作)。
要在连续处理模式下运行受支持的查询,只需指定一个具有所需检查点间隔的连续触发器作为参数。例如,

import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

检查点间隔为1秒意味着连续处理引擎将每秒记录查询的进度。生成的检查点的格式与微批处理引擎兼容,因此任何查询都可以用任何触发器重新启动。例如,支持的以微批处理模式启动的查询可以在连续模式下重新启动,反之亦然。请注意,任何时候切换到连续模式时,至少会得到一次容错保证。

10.1、支持的查询

从spark 2.3开始,在连续处理模式中只支持以下类型的查询。

有关详细信息,请参阅输入源和输出下沉部分。尽管控制台接收器适合测试,但可以最好地观察到以Kafka为源和接收器的端到端低延迟处理,因为这允许引擎在输入主题中输入数据可用的毫秒内处理数据并使结果在输出主题中可用。

10.2、告诫
上一篇 下一篇

猜你喜欢

热点阅读