Spark Streaming 1.基本操作
2021-06-08 本文已影响0人
caster
1. 数据分析分类:
流式数据处理:多条数据缓冲一起处理
批量数据处理:一条数据一处理
实时数据处理:数据处理延迟时间毫秒
离线数据处理:数据处理延迟时间小时/天
2. 微批
Spark Streaming准实时(秒/分钟),微批流(控制时间采集周期)处理方式。
Spark Streaming将数据抽象为离散化流DStream,每个时间区间内的数据都作为RDD。DStream是对RDD的封装,形成RDD序列。
3. 背压机制
调整接收数据速率适配处理和输出数据的能力。
4. 简单Demo(无状态)
此种方式,每次采集周期结果不会累加,不会保存上次记录,即无状态的。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//采集周期:3s一次
val ssc = new StreamingContext(sparkConf, Seconds(3))
//监听本地9999端口
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map((_,1))
val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_+_)
wordToCount.print()
// 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
// 如果main方法执行完毕,应用程序也会自动结束。所以不能让main执行完毕
//ssc.stop()
// 1. 启动采集器
ssc.start()
// 2. 等待采集器的关闭
ssc.awaitTermination()
5. DStream
DSstream构造方式:
- 继承ReceiverT
- 实现onStart()和onStop()方法
6. DStream保存采集周期结果(有状态流)
通过updateStateByKey更新之前周期中统计的结果;
每个周期的统计结果存在checkpoint文件中
val sparkConf = new SparkConf().setMaster("local").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
//缓冲区文件的存储目录
ssc.checkpoint("cp")
// 无状态数据操作,只对当前的采集周期内的数据进行处理
// 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
// 使用有状态操作时,需要设定检查点路径
val datas = ssc.socketTextStream("localhost", 9999)
val wordToOne = datas.map((_,1))
//val wordToCount = wordToOne.reduceByKey(_+_)
// updateStateByKey:根据key对数据的状态进行更新
// 传递的参数中含有两个值
// 第一个值表示相同的key的value数据
// 第二个值表示缓存区相同key的value数据
val state = wordToOne.updateStateByKey(
( seq:Seq[Int], buff:Option[Int] ) => {
val newCount = buff.getOrElse(0) + seq.sum
Option(newCount)
}
)
state.print()
ssc.start()
ssc.awaitTermination()
7. DStream提供的方法
- transform():获取rdd操作后返回rdd,与map()区别:
// transform方法可以将底层RDD获取到后进行操作
// 1. DStream功能不完善
// 2. 需要代码周期性的执行
// 执行位置 : Driver端
val newDS: DStream[String] = lines.transform(
rdd => {
// 执行位置 : Driver端,(周期性执行)
rdd.map(
str => {
// Code : Executor端
str
}
)
}
)
// 执行位置 : Driver端
val newDS1: DStream[String] = lines.map(
data => {
// 执行位置 : Executor端
data
}
)
- join():两个流join,底层为两个RDD join,每个周期内的数据进行join,不会保留状态
val joinDS = ds1.join(ds2)
- window():
窗口时长:每次计算的时间范围(采集周期倍数)
滑动步长:多久计算一次(每次滑动计算一次,默认每个采集周期计算一次)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))
// 窗口的范围长度应该是采集周期的整数倍
// 窗口可以滑动的,但是默认情况下,一个采集周期进行滑动,可能会出现重复数据的计算
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6))
// 为了避免重复数据,可以改变滑动的滑动(步长)
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))
val wordToCount = windowDS.reduceByKey(_+_)
wordToCount.print()
ssc.start()
ssc.awaitTermination()
countBywindow():统计窗口内元素数量
reduceByKeyAndWindow():当窗口范围比较大,但是滑动幅度比较小,可以采用增加数据和删除数据的方式,防止重复计算,提升性能。
......
8. DStream输出操作
惰性求值,需要行动算子触发(print等)
foreachRDD():底层rdd操作
9. 关闭Spark Stream程序:
ssc.start()
// 如果想要关闭采集器,那么需要创建新的线程
// 而且需要在第三方程序中增加关闭状态
new Thread(
new Runnable {
override def run(): Unit = {
while ( true ) {
if (SomeThing) {//通过外部系统判定是否需要关闭
// 获取SparkStreaming状态
val state: StreamingContextState = ssc.getState()
if ( state == StreamingContextState.ACTIVE ) {
ssc.stop(true, true)//延迟关闭
}
System.exit(0)
}
Thread.sleep(5000)
}
}
}
).start()
ssc.awaitTermination() // block 阻塞main线程
10. 关闭后恢复Spark Stream
从之前的checkpoint恢复数据或者重新创建:
val ssc = StreamingContext.getActiveOrCreate("cp", ()=>{
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))
wordToOne.print()
ssc
})
ssc.checkpoint("cp")