10 Spark Streaming
第一:Spark Streaming基本原理
Spark Streaming的核心是一种可扩展、容错的数据流系统,它采用RDD批量模式(即批量处理数据)并加快处理速度。Spark Streaming可以以小批量或批次间隔(从500毫秒到更大的的间隔窗口)运行。

如上图所示,Spark Streaming接收输入数据流,并在内部将数据流分为多个较小的batch(batch大小取决于batch的间隔)。Spark引擎将这些输入数据的batch处理后,生成处理过数据的batch结果集。
Spark Streaming的主要抽象是离散流(DStream),它代表了前面提到的构成数据流的那些小批量。DSteam建立在RDD之上,允许Spark开发人员在RDD和batch的相同上下文中工作,现在只将其应用于一系列流问题当中。另外一个重要的方面是,由于你使用的是Apache Spark,Spark Streaming与ML、SQL、DataFrame和GraphFrames都做了集成。

Spark Streaming的基本组件:
Spark Streaming是Spark API核心的扩展,支持可扩展,高吞吐量,实时数据流的容错流处理。 数据可以从Kafka,Flume,Kinesis或TCP套接字等许多来源获取,并且可以使用复杂的算法进行处理,这些算法用map,reduce,join和window等高级函数表示。 最后,处理的数据可以推送到文件系统,数据库和实时仪表板。 事实上,您可以将Spark的机器学习和图形处理算法应用于数据流。
其中,Kafaka+Spark Streaming最常用。
Apache Kafka将发布 - 订阅消息传递重新视为分布式的,分区的,复制的提交日志服务。 在使用Spark开始集成之前,请仔细阅读Kafka文档。
Kafka项目在0.8和0.10版本之间引入了一个新的 consumer API,因此有两个独立的相应的Spark Streaming软件包可用。 请为您的brokers和desired features 选择正确的版本。 注意0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。
Kafka 0.8版本
第二:为什么需要Spark Streaming?
随着在线交易和社交媒体以及传感器和设备的普及,很多公司正在以更快的速度产生和处理更多的数据。开发有规模的,实时的可实现的可预测的能力,为这些企业提供了竞争优势,流分析在数据科学家和数据工程师的工具箱中变得日益重要。
Spark Streaming正在被迅速采用,原因在于Apache Spark在同一框架内统一了所有这些不同的数据处理范例(ML的机器学习,Spark SQL和Streaming)。用户可以从培训机器学习模型(ML)到使用模型(Streaming)评测模型,并使用BI工具(SQL)执行分析与可视化展示,所有这些都在同一框架内完成。
Spark Streaming目前的应用场景!
(1)流ETL:将数据推入下游系统之前对其进行持续的清洗和聚合,可以减少最终数据存储中的数据量。
(2)触发器(Triggers):实时检测行为或异常事件,及时触发下游动作。
(3)数据浓缩:实时数据与其他数据集连接,可以进行更为丰富的分析。
(4)复杂会话和持续学习。
第三:Spark Streaming应用程序数据流是什么

如上图所示,提供了Spark Driver/Workers/Streaming源与目标间的数据流:Spark Streaming Context的ssc.start()是入口点。
目前,Spark Streaming有很多应用程序需要不断优化和配置。 Spark Streaming的文档在Scala中更完整,所以,因为您正在使用Python API,您可能有时需要请参考文档的Scala版本。
第四: 使用DStream简化Streaming应用程序
使用Python的Spark Streaming来创建一个简单的单词计数例子
使用DStream————由众多小批次数据组成的离散数据流
模拟方法
To run this example:
使用Linux中bash终端将多个单词发送到我们计算机的本地端口(9999),请注意,在终端1中输入单词
Terminal 1: nc -lk 9999
运行Spark Streaming来接收这些文字,并对他们进行计数
Terminal 2: ./bin/spark-submit streaming_word_count.py localhost 9999
实施
打开两个终端,一个用于nc命令,另一个用于Spark Streaming程序
nc -lk 9999 从这个点开始,你在终端所输入的一切将被传送到9999端口。
在另一终端屏幕运行以下streaming_word_count.py脚本
# streaming_word_count.py 如下
# 导入必要的类并创建一个本地的SparkContext和StreamingContexts,StreamingContexts是Spark Streaming的入口点
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 用两个工作线程创建Spark Context(注意:`local [2]`)
sc = SparkContext("local[2]", "NetworkWordCount")
# 创建1秒的本地StreamingContextwith批处理间隔,1是批间隔,每秒运行微批次。
ssc = StreamingContext(sc, 1)
# 创建DStream,将连接到连接到localhost:9999的输入行的流,
lines = ssc.socketTextStream("localhost", 9999)
# 统计单词
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# 将此DStream中生成的每个RDD的前十个元素打印到控制台
wordCounts.pprint()
# 启动Spark Streaming开始计算,然后等待终止命令来停止运行(CTRL+C),如果没有等到停止命令,Spark Streaming程序将继续运行。
ssc.start()
# 等待计算结束
ssc.awaitTermination()
全局实施
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "StatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
# 给Saprk Streaming配置了一个检查点(checkpoint)
ssc.checkpoint("checkpoint")
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream("localhost", 9999)
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc)
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
第五:结构化流
利用DataFrame引入结构化流,简化代码。
# Structured Streaming Word Count Example
# Original Source: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
#
# To run this example:
# Terminal 1: nc -lk 9999
# Terminal 2: ./bin/spark-submit structured_streaming_word_count.py localhost 9999
# Note, type words into Terminal 1
#
# Import the necessary classes and create a local SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
# Await Spark Streaming termination
query.awaitTermination()
第六:利用PySpark Streaming读取和分析数据
Step 1. Starting Zookeeper, creating the topic, starting Apache Kafka broker, starting the console producer.
kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
kafka$ bin/kafka-server-start.sh config/server.properties
kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pysparkBookTopic
kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pysparkBookTopic
Step 2. Starting PySpark with spark-streaming-kafka package.
$ pyspark --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0
Step 3. Creating sum of each row of numbers.
def stringToNumberSum(data):
removedSpaceData = data.strip()
if removedSpaceData == '' :
return(None)
splittedData = removedSpaceData.split(' ')
numData = [float(x) for x in splittedData]
sumOfData = sum(numData)
return (sumOfData)
dataInString = '10 10 20 '
stringToNumberSum(dataInString)
Step 4. Reading data from Kafka and getting sum of each row.
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
bookStreamContext = StreamingContext(sc, 10)
bookKafkaStream = KafkaUtils.createStream(
ssc = bookStreamContext,
zkQuorum = 'localhost:2185',
groupId = 'pysparkBookGroup',
topics = {'pysparkBookTopic':1}
)
sumedData = bookKafkaStream.map( lambda data : stringToNumberSum(data[1]))
sumedData.pprint()
bookStreamContext.start()
bookStreamContext.awaitTerminationOrTimeout(30)
第七:Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)
方法1:基于接收者的方法
对于Python应用程序,在部署应用程序时,必须添加上面的库及其依赖关系。 请参阅下面的部署小节。
(1)导入KafkaUtils并创建一个输入DStream,如下所示:
from pyspark.streaming.kafka import KafkaUtils
kafkaStream = KafkaUtils.createStream(streamingContext, \
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
-
默认情况下,Python API会将Kafka数据解码为UTF8编码的字符串。 您可以指定自定义解码函数,将Kafka记录中的字节数组解码为任意的数据类型。
-
Kafka中的主题分区与Spark Streaming中生成的RDD的分区不相关。 因此,增加KafkaUtils.createStream()中主题特定分区的数量只会增加单个接收者使用哪些主题的线程数,在处理数据时不会增加Spark的并行性。
-
多个Kafka输入DStreams可以创建不同的组和主题,用于使用多个接收器并行接收数据。如果已经使用HDFS等复制文件系统启用了写入日志,则接收到的数据已经被复制到日志中。
(2)部署:与任何Spark应用程序一样,spark-submit用于启动您的应用程序。
对于缺乏SBT / Maven项目管理的Python应用程序,可以使用--packages直接将spark-streaming-kafka-0-8_2.11及其依赖关系添加到spark-submit
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 ...
方法2:直接方法(无接收者)
在Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强大的端到端保证。 这种方法不是使用接收器来接收数据,而是定期查询Kafka在每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。 当处理数据的作业启动时,Kafka简单的客户API用于读取Kafka中定义的偏移范围(类似于从文件系统读取文件)。
与基于接收机的方法(即方法1)相比,这种方法具有以下优点。
-
简化的并行性:不需要创建多个输入Kafka流并将其合并。使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。所以在Kafka和RDD分区之间有一对一的映射关系,这更容易理解和调整。
-
效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次 - 一次是由卡夫卡(Kafka),另一次是由预先写入日志(Write Ahead Log)复制。这个第二种方法消除了这个问题,因为没有接收器,因此不需要预先写入日志。只要你有足够的卡夫卡保留,消息可以从卡夫卡恢复。
-
完全一次的语义:第一种方法使用Kafka的高级API来存储Zookeeper中消耗的偏移量。传统上这是从卡夫卡消费数据的方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消耗两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障时有效地接收一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务(请参阅主程序中输出操作的语义指导进一步的信息)。
请注意,这种方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监控工具将不会显示进度。但是,您可以在每个批次中访问由此方法处理的偏移量,并自己更新Zookeeper(请参见下文)。
接下来,我们将讨论如何在流式应用程序中使用这种方法。
(1)导入KafkaUtils并创建一个输入DStream,如下所示:
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
-
您也可以传递一个messageHandler到createDirectStream来访问包含关于当前消息的元数据的KafkaMessageAndMetadata并将其转换为任何需要的类型。 默认情况下,Python API会将Kafka数据解码为UTF8编码的字符串。 您可以指定自定义解码函数,将Kafka记录中的字节数组解码为任意的数据类型。 查看API文档和示例。
-
在Kafka参数中,您必须指定metadata.broker.list或bootstrap.servers。 默认情况下,它将开始消耗每个Kafka分区的最新偏移量。 如果您将Kafka参数中的配置auto.offset.reset设置为最小,那么它将从最小的偏移量开始消耗。
-
你也可以使用KafkaUtils.createDirectStream的其他变体开始消耗任何偏移量。 此外,如果您想访问每批中消耗的卡夫卡补偿,您可以执行以下操作。
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream \
.transform(storeOffsetRanges) \
.foreachRDD(printOffsetRanges)
-
如果您希望基于Zookeeper的Kafka监视工具显示流应用程序的进度,您可以使用它自己更新Zookeeper。
-
请注意,HasOffsetRanges的类型转换只会在directKafkaStream中调用的第一个方法中完成,而不是在方法链之后。您可以使用transform()而不是foreachRDD()作为第一个方法调用,以访问偏移量,然后调用更多的Spark方法。但是,请注意,RDD分区与Kafka分区之间的一对一映射在任何混洗或重新分区的方法(例如, reduceByKey()或window()。
-
另外要注意的是,由于这种方法不使用接收器,与标准接收器相关的(即spark.streaming.receiver。形式的配置)将不适用于由此方法创建的输入DStream(将应用于其他输入DStreams)。相反,使用配置spark.streaming.kafka。。最重要的是spark.streaming.kafka.maxRatePerPartition,它是每个Kafka分区将被这个直接API读取的最大速率(每秒消息数)。
(2)部署:与任何Spark应用程序一样,spark-submit用于启动您的应用程序。
对于缺乏SBT / Maven项目管理的Python应用程序,可以使用--packages直接将spark-streaming-kafka-0-8_2.11及其依赖关系添加到spark-submit
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 ...
第八:Spark Streaming + Flume Integration Guide (Kafka broker version 0.8.2.1 or higher)
Apache Flume是一个分布式的,可靠的,可用的服务,用于高效地收集,聚合和移动大量的日志数据。 在这里我们解释如何配置Flume和Spark Streaming来接收来自Flume的数据。 有两种方法。
方法1:Flume-style Push-based Approach
Flume旨在在Flume代理之间推送数据。 在这种方法中,Spark Streaming基本上设置了一个接收器,作为Flume的一个Avro代理,Flume可以将这个接收器推送到这个接收器。
配置步骤:
General Requirements
选择群集中的一台机器
-
当启动Flume + Spark Streaming应用程序时,其中一名Spark工作人员必须在该机器上运行。
-
Flume可以配置为将数据推送到该机器上的端口。
由于推送模式,流媒体应用程序需要启动,接收机已安排并在所选端口上侦听,Flume才能够推送数据。
Configuring Flume
配置Flume代理通过在配置文件中包含以下内容将数据发送到Avro接收器。
agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>
Configuring Spark Streaming Application(以Python为例)
from pyspark.streaming.flume import FlumeUtils
flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
默认情况下,Python API会将Flume事件主体解码为UTF8编码的字符串。 您可以指定自定义解码函数,将Flume事件中的正文字节数组解码为任意任意数据类型。 查看API文档和示例。
请注意,主机名应该与群集中的资源管理器(Mesos,YARN或Spark Standalone)所使用的相同,以便资源分配可以匹配名称,并在正确的机器中启动接收器。
对于缺乏SBT / Maven项目管理的Python应用程序,spark-streaming-flume_2.11及其依赖项可直接添加到使用--packages的spark-submit
./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.2.1 ...
方法2:Pull-based Approach using a Custom Sink
Flume不是直接将数据直接推送到Spark Streaming,而是运行一个自定义的Flume接收器,它允许进行以下操作。
Flume将数据推入接收器,并且数据保持缓冲。
Spark Streaming使用可靠的Flume接收器和事务来从接收器中提取数据。 只有在Spark Streaming接收和复制数据后,事务才能成功。
这确保了比以前的方法更强大的可靠性和容错保证。 但是,这需要配置Flume运行自定义接收器。
配置步骤:
General Requirements
选择一台将在Flume代理中运行定制接收器的机器。 Flume管道的其余部分配置为向该代理发送数据。 Spark群集中的机器应该可以访问运行定制接收器的选定机器。
Configuring Flume
Configuring Spark Streaming Application
from pyspark.streaming.flume import FlumeUtils
addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)