Structured Streaming概述
2018-08-07 本文已影响13人
盗梦者_56f2
简介
Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以以静态数据表示批量计算的方式来表达 流式计算。 Spark SQL 引擎将随着 流数据持续到达而增量地持续地运行,并更新最终结果。最后,系统通过 checkpointing (检查点) 和 Write Ahead Logs (预写日志)来确保 end-to-end exactly-once (端到端的完全一次性) 容错保证。简而言之,Structured Streaming 提供快速,可扩展,容错,end-to-end exactly-once stream processing (端到端的完全一次性流处理),而无需用户理解 streaming 。
编程模型
Structured Streaming 的关键思想是将实时数据流视为一种正在不断 appended (附加)的表。这形成了一个与批处理模型非常相似的新的 流处理模型。您会将您的流式计算表示为在一个静态表上的标准类批次查询,并且 Spark 在 unbounded(无界) 输入表上运行它作为 增量查询。
创建SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
创建Streaming DataFrame
#socket
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
#hdfs
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // 指定 csv 文件的模式
.csv("/path/to/directory")
#kafka
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
输出接收器
#hdfs
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
writeStream
.foreach(...)
.start()
#console
writeStream
.format("console")
.start()
#memory
writeStream
.format("memory")
.queryName("tableName")
.start()
#kafka
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()