pyspark学习

10 Spark Streaming

2018-01-19  本文已影响259人  7125messi

Spark Streaming

第一:Spark Streaming基本原理

Spark Streaming的核心是一种可扩展、容错的数据流系统,它采用RDD批量模式(即批量处理数据)并加快处理速度。Spark Streaming可以以小批量或批次间隔(从500毫秒到更大的的间隔窗口)运行。

image.png

如上图所示,Spark Streaming接收输入数据流,并在内部将数据流分为多个较小的batch(batch大小取决于batch的间隔)。Spark引擎将这些输入数据的batch处理后,生成处理过数据的batch结果集。

Spark Streaming的主要抽象是离散流(DStream),它代表了前面提到的构成数据流的那些小批量。DSteam建立在RDD之上,允许Spark开发人员在RDD和batch的相同上下文中工作,现在只将其应用于一系列流问题当中。另外一个重要的方面是,由于你使用的是Apache Spark,Spark Streaming与ML、SQL、DataFrame和GraphFrames都做了集成。

image.png

Spark Streaming的基本组件:
Spark Streaming是Spark API核心的扩展,支持可扩展,高吞吐量,实时数据流的容错流处理。 数据可以从Kafka,Flume,Kinesis或TCP套接字等许多来源获取,并且可以使用复杂的算法进行处理,这些算法用map,reduce,join和window等高级函数表示。 最后,处理的数据可以推送到文件系统,数据库和实时仪表板。 事实上,您可以将Spark的机器学习和图形处理算法应用于数据流。

其中,Kafaka+Spark Streaming最常用。
Apache Kafka将发布 - 订阅消息传递重新视为分布式的,分区的,复制的提交日志服务。 在使用Spark开始集成之前,请仔细阅读Kafka文档。
Kafka项目在0.8和0.10版本之间引入了一个新的 consumer API,因此有两个独立的相应的Spark Streaming软件包可用。 请为您的brokers和desired features 选择正确的版本。 注意0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。
Kafka 0.8版本

第二:为什么需要Spark Streaming?

随着在线交易和社交媒体以及传感器和设备的普及,很多公司正在以更快的速度产生和处理更多的数据。开发有规模的,实时的可实现的可预测的能力,为这些企业提供了竞争优势,流分析在数据科学家和数据工程师的工具箱中变得日益重要。

Spark Streaming正在被迅速采用,原因在于Apache Spark在同一框架内统一了所有这些不同的数据处理范例(ML的机器学习,Spark SQL和Streaming)。用户可以从培训机器学习模型(ML)到使用模型(Streaming)评测模型,并使用BI工具(SQL)执行分析与可视化展示,所有这些都在同一框架内完成。

Spark Streaming目前的应用场景!
(1)流ETL:将数据推入下游系统之前对其进行持续的清洗和聚合,可以减少最终数据存储中的数据量。
(2)触发器(Triggers):实时检测行为或异常事件,及时触发下游动作。
(3)数据浓缩:实时数据与其他数据集连接,可以进行更为丰富的分析。
(4)复杂会话和持续学习。

第三:Spark Streaming应用程序数据流是什么

image.png

如上图所示,提供了Spark Driver/Workers/Streaming源与目标间的数据流:Spark Streaming Context的ssc.start()是入口点。
目前,Spark Streaming有很多应用程序需要不断优化和配置。 Spark Streaming的文档在Scala中更完整,所以,因为您正在使用Python API,您可能有时需要请参考文档的Scala版本。

第四: 使用DStream简化Streaming应用程序

使用Python的Spark Streaming来创建一个简单的单词计数例子
使用DStream————由众多小批次数据组成的离散数据流

模拟方法

To run this example:
使用Linux中bash终端将多个单词发送到我们计算机的本地端口(9999),请注意,在终端1中输入单词
Terminal 1: nc -lk 9999
运行Spark Streaming来接收这些文字,并对他们进行计数
Terminal 2: ./bin/spark-submit streaming_word_count.py localhost 9999

实施

打开两个终端,一个用于nc命令,另一个用于Spark Streaming程序
nc -lk 9999 从这个点开始,你在终端所输入的一切将被传送到9999端口。

在另一终端屏幕运行以下streaming_word_count.py脚本

# streaming_word_count.py 如下
# 导入必要的类并创建一个本地的SparkContext和StreamingContexts,StreamingContexts是Spark Streaming的入口点
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 用两个工作线程创建Spark Context(注意:`local [2]`)
sc = SparkContext("local[2]", "NetworkWordCount")

# 创建1秒的本地StreamingContextwith批处理间隔,1是批间隔,每秒运行微批次。
ssc = StreamingContext(sc, 1)

# 创建DStream,将连接到连接到localhost:9999的输入行的流,
lines = ssc.socketTextStream("localhost", 9999)

# 统计单词
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# 将此DStream中生成的每个RDD的前十个元素打印到控制台
wordCounts.pprint()

# 启动Spark Streaming开始计算,然后等待终止命令来停止运行(CTRL+C),如果没有等到停止命令,Spark Streaming程序将继续运行。
ssc.start()             

# 等待计算结束
ssc.awaitTermination()  

全局实施

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "StatefulNetworkWordCount")

ssc = StreamingContext(sc, 1)

# 给Saprk Streaming配置了一个检查点(checkpoint)
ssc.checkpoint("checkpoint")

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream("localhost", 9999)

running_counts = lines.flatMap(lambda line: line.split(" "))\
                    .map(lambda word: (word, 1))\
                    .updateStateByKey(updateFunc)

running_counts.pprint()

ssc.start()             

ssc.awaitTermination()  

第五:结构化流

利用DataFrame引入结构化流,简化代码。

# Structured Streaming Word Count Example
#    Original Source: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
#
# To run this example:
#   Terminal 1:  nc -lk 9999
#   Terminal 2:  ./bin/spark-submit structured_streaming_word_count.py localhost 9999
#   Note, type words into Terminal 1
#

# Import the necessary classes and create a local SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()


 # Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .load()

# Split the lines into words
words = lines.select(
    explode(
        split(lines.value, ' ')
    ).alias('word')
)

# Generate running word count
wordCounts = words.groupBy('word').count()


# Start running the query that prints the running counts to the console
query = wordCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .start()

# Await Spark Streaming termination
query.awaitTermination()

第六:利用PySpark Streaming读取和分析数据

Step 1. Starting Zookeeper, creating the topic, starting Apache Kafka broker, starting the console producer.
kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties 
kafka$ bin/kafka-server-start.sh config/server.properties 
kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pysparkBookTopic
kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pysparkBookTopic
Step 2. Starting PySpark with spark-streaming-kafka package.
$ pyspark --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0
Step 3. Creating sum of each row of numbers.
def stringToNumberSum(data):
       removedSpaceData = data.strip()
       if   removedSpaceData == '' :
            return(None)
       splittedData =  removedSpaceData.split(' ')
       numData =  [float(x) for x in splittedData]
       sumOfData = sum(numData)
       return (sumOfData)

dataInString = '10 10 20 '

stringToNumberSum(dataInString)
Step 4. Reading data from Kafka and getting sum of each row.
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
bookStreamContext = StreamingContext(sc, 10)
bookKafkaStream = KafkaUtils.createStream( 
                                       ssc = bookStreamContext, 
                                       zkQuorum = 'localhost:2185', 
                                     groupId = 'pysparkBookGroup', 
                                                                     topics = {'pysparkBookTopic':1}
                                                                               )
sumedData = bookKafkaStream.map( lambda data : stringToNumberSum(data[1]))
sumedData.pprint()
bookStreamContext.start()
bookStreamContext.awaitTerminationOrTimeout(30)

第七:Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)

方法1:基于接收者的方法

对于Python应用程序,在部署应用程序时,必须添加上面的库及其依赖关系。 请参阅下面的部署小节。

(1)导入KafkaUtils并创建一个输入DStream,如下所示:

from pyspark.streaming.kafka import KafkaUtils

kafkaStream = KafkaUtils.createStream(streamingContext, \
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

(2)部署:与任何Spark应用程序一样,spark-submit用于启动您的应用程序。
对于缺乏SBT / Maven项目管理的Python应用程序,可以使用--packages直接将spark-streaming-kafka-0-8_2.11及其依赖关系添加到spark-submit

 ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 ...

方法2:直接方法(无接收者)

在Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强大的端到端保证。 这种方法不是使用接收器来接收数据,而是定期查询Kafka在每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。 当处理数据的作业启动时,Kafka简单的客户API用于读取Kafka中定义的偏移范围(类似于从文件系统读取文件)。

与基于接收机的方法(即方法1)相比,这种方法具有以下优点。

请注意,这种方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监控工具将不会显示进度。但是,您可以在每个批次中访问由此方法处理的偏移量,并自己更新Zookeeper(请参见下文)。

接下来,我们将讨论如何在流式应用程序中使用这种方法。
(1)导入KafkaUtils并创建一个输入DStream,如下所示:

from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
offsetRanges = []

def storeOffsetRanges(rdd):
     global offsetRanges
     offsetRanges = rdd.offsetRanges()
     return rdd

def printOffsetRanges(rdd):
     for o in offsetRanges:
         print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

directKafkaStream \
     .transform(storeOffsetRanges) \
     .foreachRDD(printOffsetRanges)
 ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 ...

第八:Spark Streaming + Flume Integration Guide (Kafka broker version 0.8.2.1 or higher)

Apache Flume是一个分布式的,可靠的,可用的服务,用于高效地收集,聚合和移动大量的日志数据。 在这里我们解释如何配置Flume和Spark Streaming来接收来自Flume的数据。 有两种方法。

方法1:Flume-style Push-based Approach

Flume旨在在Flume代理之间推送数据。 在这种方法中,Spark Streaming基本上设置了一个接收器,作为Flume的一个Avro代理,Flume可以将这个接收器推送到这个接收器。
配置步骤:

General Requirements

选择群集中的一台机器

由于推送模式,流媒体应用程序需要启动,接收机已安排并在所选端口上侦听,Flume才能够推送数据。

Configuring Flume

配置Flume代理通过在配置文件中包含以下内容将数据发送到Avro接收器。

agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>

Configuring Spark Streaming Application(以Python为例)

 from pyspark.streaming.flume import FlumeUtils
 flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

默认情况下,Python API会将Flume事件主体解码为UTF8编码的字符串。 您可以指定自定义解码函数,将Flume事件中的正文字节数组解码为任意任意数据类型。 查看API文档和示例。

请注意,主机名应该与群集中的资源管理器(Mesos,YARN或Spark Standalone)所使用的相同,以便资源分配可以匹配名称,并在正确的机器中启动接收器。

对于缺乏SBT / Maven项目管理的Python应用程序,spark-streaming-flume_2.11及其依赖项可直接添加到使用--packages的spark-submit

 ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.2.1 ...

方法2:Pull-based Approach using a Custom Sink

Flume不是直接将数据直接推送到Spark Streaming,而是运行一个自定义的Flume接收器,它允许进行以下操作。

Flume将数据推入接收器,并且数据保持缓冲。
Spark Streaming使用可靠的Flume接收器和事务来从接收器中提取数据。 只有在Spark Streaming接收和复制数据后,事务才能成功。
这确保了比以前的方法更强大的可靠性和容错保证。 但是,这需要配置Flume运行自定义接收器。
配置步骤:

General Requirements

选择一台将在Flume代理中运行定制接收器的机器。 Flume管道的其余部分配置为向该代理发送数据。 Spark群集中的机器应该可以访问运行定制接收器的选定机器。

Configuring Flume

Configuring Spark Streaming Application

from pyspark.streaming.flume import FlumeUtils
addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
上一篇 下一篇

猜你喜欢

热点阅读