眼君的大数据之路

spark开发笔记(四、Spark Streaming笔记)

2020-09-04  本文已影响0人  眼君

Spark Streaming可整合多种输入数据源如Kafka、flume、hdfs等。


基本原理

Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经过Spark引擎以类似批处理的方式处理每个时间片数据。

DStream

Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。
在内部实现上,Spark Streaming的输入数据按照时间片(如1秒)分成一段一段的DStream,每一段数据转换为Spark中的RDD,并且对DStream的操作都最终转变为相应的RDD的操作。

Spark Streaming采用的小批量处理的方式使它可以同时兼容批量和实时数据处理的逻辑和算法,因此,方便了一些需要历史数据和实时数据联合分析的特定应用场合。

DStream操作概述

Spark Streaming工作原理
  1. 在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上。
  2. 每个Receiver都会负责一个input DStream。
  3. Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据。
Spark Streaming程序基本步骤
  1. 通过创建输入DStream来定义输入源。
  2. 通过对DStream应用转换操作和输出操作来定义流计算。
  3. 用streamingContext.start()来开始接收数据和处理流程。
  4. 通过streamingContext.awaitTermination()方法来等待处理结束。
  5. 可以通过streamingContext.stop()来手动结束流计算进程。

pyspark实现最简单spark streaming程序

首先在Linux中下载安装一个nc工具:

yum install -y nc

利用这个工具可以构造一个简易的数据源:

nc -lk 9999

然后可以手动输入一些消息:

hello world
hi mom

如果要运行一个Spark Streaming程序,首先需要生成一个StreamingContext对象,它是Spark Streaming程序主入口可以从一个SparkConf对象创建一个StreamingContext对象。
以下是pyspark的实现wordcount的流计算代码:

import findspark
findspark.init()
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

conf = SparkConf().setMaster('local[*]').setAppName('PySparkShell')
sc = SparkContext.getOrCreate(conf)

ssc = StreamingContext(sc, 5) # 5秒的计算窗口
ssc.checkpoint("./spark_checkpoint")

inputDstream = ssc.socketTextStream('192.168.2.144',9999)

def updateFunc(new_values,last_sum):
    return sum(new_values) + (last_sum or 0)

wordsDstream = inputDstream.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).updateStateByKey(updateFunc=updateFunc)

wordsDstream.pprint()

ssc.start()
ssc.awaitTermination()

pyspark实现spark streaming,将数据写入MySQL

import findspark
findspark.init()
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
import pymysql

conf = SparkConf()
sc = SparkContext.getOrCreate(conf)

ssc = StreamingContext(sc, 5) # 5秒的计算窗口
ssc.checkpoint("./spark_checkpoint")

inputDstream = ssc.socketTextStream('127.0.0.1',9999)

def updateFunc(new_values,last_sum):
    return sum(new_values) + (last_sum or 0)

wordsDstream = inputDstream.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).updateStateByKey(updateFunc=updateFunc)

wordsDstream.pprint()

wordsDstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))

def sendRecord(rdd):
    #结果插入MySQL  
    conn = pymysql.connect(user="root",passwd="",host="127.0.0.1",db="sparkstream",charset="utf8")  
    cursor = conn.cursor()  
    sql = "insert into wordcount(`key`,`value`) value('%s',%d)" % (rdd[0],rdd[1])
    cursor.execute(sql)  
    conn.commit()  
    conn.close()   

ssc.start()
ssc.awaitTermination()
上一篇下一篇

猜你喜欢

热点阅读