Big Data

Structured Streaming概述

2018-08-07  本文已影响13人  盗梦者_56f2

简介

Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以以静态数据表示批量计算的方式来表达 流式计算。 Spark SQL 引擎将随着 流数据持续到达而增量地持续地运行,并更新最终结果。最后,系统通过 checkpointing (检查点) 和 Write Ahead Logs (预写日志)来确保 end-to-end exactly-once (端到端的完全一次性) 容错保证。简而言之,Structured Streaming 提供快速,可扩展,容错,end-to-end exactly-once stream processing (端到端的完全一次性流处理),而无需用户理解 streaming 。

编程模型

Structured Streaming 的关键思想是将实时数据流视为一种正在不断 appended (附加)的表。这形成了一个与批处理模型非常相似的新的 流处理模型。您会将您的流式计算表示为在一个静态表上的标准类批次查询,并且 Spark 在 unbounded(无界) 输入表上运行它作为 增量查询。

创建SparkSession

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._

创建Streaming DataFrame

#socket
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()
#hdfs
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // 指定 csv 文件的模式
  .csv("/path/to/directory")
#kafka
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

输出接收器

#hdfs
writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .foreach(...)
    .start()
#console
writeStream
    .format("console")
    .start()
#memory
writeStream
    .format("memory")
    .queryName("tableName")
    .start()
#kafka
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()
上一篇 下一篇

猜你喜欢

热点阅读