spark学习笔记3-StructuredStreaming
本文是对Spark的核心外围组件之一的Structured Streaming的一个学习总结,本文共包含如下几部分的内容:
- 概述
- Structured Streaming的结构化流失处理
- 简单实例
- 小结
参考资料:
1、如果要对Spark的基本知识有所了解,可参考文档《spark学习笔记1-基础部分》。
2、如果要对Spark SQL的基本知识有所了解,可参考文档《spark学习笔记2-Spark SQL》。
一、概述
相比于我们熟悉的Spark streaming组件,Structured Streaming是Spark2.0版本提出的新的实时流框架(2.0和2.1是实验版本,从Spark2.2开始为稳定版本),它的API设计比Spark streaming更加简单易用,功能也更加强大。从Spark-2.X版本后,Spark streaming就进入维护模式。
在介绍Structured Streaming之前,我们先来介绍下Spark streaming。Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。它支持从多种数据源获取数据,包括Kafk、Flume、以及TCP socket等,从数据源获取数据之后,可以使用诸如map、reduce和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统和数据库等系统中。其处理的数据流图如下图:
Spark Streaming接收流数据,并根据一定的时间间隔拆分成一批批batch数据,用抽象接口DStream表示(DStream可以看成是一组RDD序列,每个batch对应一个RDD),然后通过Spark Engine处理这些batch数据,最终得到处理后的一批批结果数据。其处理机制如下图:
相比Spark Streaming, Structured Streaming有如下特点:
1、如Spark Streaming一样能支持多种数据源的输入和输出。
2、支持以结构化的方式操作流式数据,能够像使用Spark SQL处理离线的批处理一样,处理流数据,代码更简洁,写法更简单。
3、基于Event-Time,相比于Spark Streaming的Processing-Time更精确,更符合业务场景。
4、解决了Spark Streaming存在的代码升级,DAG图变化引起的任务失败,无法断点续传的硬伤问题。
下面章节我们来详细介绍Structured Streaming的特点和使用。
二、StructuredStreaming的结构化流式处理
Structured Streaming将实时流抽象成一张无边界的表(Unbounded Table),输入的每一条数据(new data)当成输入表的一个新行(new rows),同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据。如下图所示:
输入的流数据是以batch为单位被处理,每个batch会触发一次流式计算(如sql查询,开发人员预先定义好的),计算结果被更新到Result Table。每个batch代表一个触发间隔,默认设置为1秒,这样每1秒从输入源读取数据到Input Table,然后触发Query计算,将结果写入Result Table。如下图所示:
上面流程中Result Table中的信息会被输出到外部数据源中,如文件、kafka、关系数据库中,在测试阶段最简单的方式可输出到控制台上。一共有三种Output模式:
1、Append模式:仅仅从上次触发计算到当前新增的行会被输出。仅仅支持行数据插入结果表后不进行更改的query操作。。
2、Complete模式: 每次触发都会将整个结果表输出。这个是针对聚合操作的。。
3、Update模式:仅仅是自上次触发之后结果表有变更的行会输出。
上面介绍了structured Streaming采用结构化流失处理的基本流程和机制,下面章节我们通过一个具体的例子实际感受下。
三、简单实例
这个例子,输入信息来自一个socket服务器,当spark客户端程序(这里指的是利用structured Streaming的API编写的程序)连接到该socket后,socket服务器就会随机的发送数据,然后spark客户端就会获取数据进行处理。
我们先用java来编写一个用于测试的socket服务器程序,代码如下:
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;
public class SocketExample {
public static void main(String[] args) {
try {
ServerSocket server = new ServerSocket(9999);
System.out.println("启动服务器,等待客户端连接....");
Socket client = server.accept();
System.out.println("客户端已连接到服务器");
for (int i = 0; i < 10000; i++) {
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
Random rand = new Random();
StringBuffer data = new StringBuffer();
for(int k =0;k<6;k++){
data.append("data" + rand.nextInt(10)+" ");
}
data.append("\n");
bw.write(data.toString());
bw.flush();
Thread.sleep(10);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
上面程序很简单,创建一个Socket服务器,侦听端口是9999,然后等待客户端连接。接收到客户端连接后,在一个for循环中,每隔10毫秒向客户端发送一个字符串,字符串的内容是由6以空格分隔的字符串组成。
上面代码编译后是一个可执行的Java程序,然后我们在控制台上启动该java程序,如: java SocketExample
下面我们启动spark shell,在命令行中编写一个structured Streaming程序。该程序的功能是连接到socket服务器,获取socket服务器发送的字符串信息,统计所有的不同单词出现的次数。代码如下:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("newdemo").getOrCreate()
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val words = lines.as[String].flatMap(_.split(" "))
words.createOrReplaceTempView("info")
val wordCounts = spark.sql("select value,count(*) from info group by value")
wordCounts.writeStream.outputMode("complete").format("console").start()
下面我们来解释上面代码的含义。
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
上面语句的作用是设置连接数据源的信息,返回一个DataFrame对象,这样相当于把socket数据源当作一个DataFrame来处理,这样我们就可以利用DataFrame提供的各种方法来进行数据的操作了。
val words = lines.as[String].flatMap(_.split(" "))
上面语句的含义是将上面获取的lines对象(DataFrame类型)中的每个每个字符串(是以空格分隔的多个字符串组成)中的单词取出了,生成一个新的Dataset对象。然后就可以对Dataset对象进行处理了。
words.createOrReplaceTempView("info")
上面语句的含义是将words对象(DataSet类型)注册为一个SQL临时视图,这样既可以执行sql语句了。因为words中的数据类型是字符窜,所有该视图只有一个字段,字段名为默认的value。
val wordCounts = spark.sql("select value,count() from info group by value");
上面语句的意思就是执行一个sql统计操作,即统计相当单词出现的次数。*
wordCounts.writeStream.outputMode("complete").format("console").start()
上面语句的作用是真的执行任务,包括连接socket服务器,获取数据进行处理。也就是说前面的代码都不会执行真正的动作,相当于是设置信息和要执行的动作,这也是函数式编程中惰性特性的一个典型体现。
上面这个语句中 outputMode("complete") 表示输出的方式为完全模式,format("console")表示输出到控制台。这时我们观察控制台,会发现不断打印类似的信息:
Batch: 9
-------------------------------------------
+-----+--------+
|value|count(1)|
+-----+--------+
|data9| 1378|
|data6| 1354|
|data4| 1378|
|data7| 1497|
|data1| 1353|
|data5| 1305|
|data3| 1307|
|data8| 1393|
|data0| 1340|
|data2| 1357|
+-----+--------+
上面信息代表了StructuredStreaming每次batch处理的结果,我们会看到不断打印这样的信息片段。并且会看到这些单词的统计数量的值不断增大。
上面我们用的是完全输出模式,也就是说是统计所有获取数据中单词的重复次数。如果我们希望只是简单输出从socket中获取的单词,则后面两句代码改为如下代码:
val wordCounts = spark.sql("select value from info");
wordCounts.writeStream.outputMode("append").format("console").start()
可以看出除sql语句变化外,outputMode函数的参数值由"complete"变为"append"。这时我们观察控制台的输出,可以看到如下的片段重复出现:
Batch: 32
-------------------------------------------
+-----+
|value|
+-----+
|data4|
|data7|
|data1|
|data1|
|data0|
|data5|
|data0|
|data9|
|data8|
|data7|
|data7|
|data2|
|data4|
四、小结
本文对Structured Streaming的基本概念做了简单的介绍,并通过一个具体的例子来真实感受下Structured Streaming的使用,可以看出编写Structured Streaming程序还是很方便的,可以利用Spark SQL和数据集的各种API进行很方便的处理,就和批处理的方式一样。
需要说明的是,Structured Streaming的功能远不止本文介绍的这点,本文只是介绍最基础的一部分。