PySpark-数据操作-DStream

2019-07-07  本文已影响0人  NEO_X

内容摘入自<<Python大数据分析从入门到精通>>

附书源码下载地址

更多信息https://blue-shadow.top/

Spark DStream

作为Apache Spark API的扩展,Spark Streaming是容错的高吞吐量系统。它处理实时数据流。Spark流取得输入来自各种输入可靠来源,如flume,HDFS,和kafka等,并且然后将已处理的数据发送到文件系统,数据库或实时仪表盘。输入数据流被分成批量数据,然后批量生成结果的最终流。

Spark DStream(Discretized Stream)是Spark Streaming的基本抽象。DStream是一个连续的数据流。它接收来自各种来源的输入,如Kafka,Flume,Kinesis或TCP套接字。它也可以是通过转换输入流生成的数据流。DStream的核心是连续的RDD(Spark抽象)流。DStream中的每个RDD都包含来自特定间隔的数据。

DStream上的任何操作都适用于所有底层RDD。DStream涵盖了所有细节。它为开发人员提供了一个高级API,以方便使用。因此,Spark DStream便于处理流数据

Spark Streaming为DStream 提供了与RDD相同的容错属性。只要输入数据的副本可用,它就可以使用RDD的谱系从它重新计算任何状态。默认情况下,Spark会复制两个节点上的数据。因此,Spark Streaming可以承受单个工作者故障

dstream.png

Spark DStream操作

代码举例

# 每隔一段时间生成一个文件,模拟数据文件流的生成,用于DStream监控对应的文件夹,进行文件处理
import time
import datetime

def generate_file():
    t = time.strftime('%Y-%m-%d',time.localtime())
    newfile = t + '.txt' 
    f = open(newfile,'w')
    f.write(newfile) 
    f.close()

if __name__ == '__main__':
    generate_file()
# 使用DStream进行流处理
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

def read_file_stream():
    sc = SparkContext.getOrCreate()
    ssc = StreamingContext(sc, 1)

    stream_data = ssc.textFileStream("D:\Developing\data")
    stream_data.pprint()
    ssc.start()
    ssc.awaitTermination()

def save_stream_rdd():
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)
    ssc = StreamingContext(sc, 1)
    stream_data = ssc.textFileStream("D:\Developing\data")   
    value = stream_data.countByValue()
    ssc.start()
    ssc.awaitTermination()

if __name__=="__main__":
    read_file_stream()
    #save_stream_rdd()
dstream_test.png

上一篇:pyspark-数据操作-rdd
下一篇:pyspark-数据操作-dstream

上一篇下一篇

猜你喜欢

热点阅读