Spark Streaming 1.基本操作

2021-06-08  本文已影响0人  caster

1. 数据分析分类:

流式数据处理:多条数据缓冲一起处理
批量数据处理:一条数据一处理

实时数据处理:数据处理延迟时间毫秒
离线数据处理:数据处理延迟时间小时/天

2. 微批

Spark Streaming准实时(秒/分钟),微批流(控制时间采集周期)处理方式。
Spark Streaming将数据抽象为离散化流DStream,每个时间区间内的数据都作为RDD。DStream是对RDD的封装,形成RDD序列。

3. 背压机制

调整接收数据速率适配处理和输出数据的能力。

4. 简单Demo(无状态)

此种方式,每次采集周期结果不会累加,不会保存上次记录,即无状态的。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//采集周期:3s一次
val ssc = new StreamingContext(sparkConf, Seconds(3))

//监听本地9999端口
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

val words = lines.flatMap(_.split(" "))

val wordToOne = words.map((_,1))

val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_+_)

wordToCount.print()

// 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
// 如果main方法执行完毕,应用程序也会自动结束。所以不能让main执行完毕
//ssc.stop()
// 1. 启动采集器
ssc.start()
// 2. 等待采集器的关闭
ssc.awaitTermination()

5. DStream

DSstream构造方式:

  1. 继承ReceiverT
  2. 实现onStart()和onStop()方法

6. DStream保存采集周期结果(有状态流)

通过updateStateByKey更新之前周期中统计的结果;
每个周期的统计结果存在checkpoint文件中

val sparkConf = new SparkConf().setMaster("local").setAppName("SparkStreaming")

val ssc = new StreamingContext(sparkConf, Seconds(3))
//缓冲区文件的存储目录
ssc.checkpoint("cp")

// 无状态数据操作,只对当前的采集周期内的数据进行处理
// 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
// 使用有状态操作时,需要设定检查点路径
val datas = ssc.socketTextStream("localhost", 9999)

val wordToOne = datas.map((_,1))

//val wordToCount = wordToOne.reduceByKey(_+_)

// updateStateByKey:根据key对数据的状态进行更新
// 传递的参数中含有两个值
// 第一个值表示相同的key的value数据
// 第二个值表示缓存区相同key的value数据
val state = wordToOne.updateStateByKey(
    ( seq:Seq[Int], buff:Option[Int] ) => {
        val newCount = buff.getOrElse(0) + seq.sum
        Option(newCount)
    }
)

state.print()

ssc.start()
ssc.awaitTermination()

7. DStream提供的方法

  1. transform():获取rdd操作后返回rdd,与map()区别:

// transform方法可以将底层RDD获取到后进行操作
// 1. DStream功能不完善
// 2. 需要代码周期性的执行

// 执行位置 : Driver端
val newDS: DStream[String] = lines.transform(
    rdd => {
        // 执行位置 : Driver端,(周期性执行)
        rdd.map(
            str => {
                // Code : Executor端
                str
            }
        )
    }
)
// 执行位置 : Driver端
val newDS1: DStream[String] = lines.map(
    data => {
        // 执行位置 : Executor端
        data
    }
)
  1. join():两个流join,底层为两个RDD join,每个周期内的数据进行join,不会保留状态
val joinDS = ds1.join(ds2)
  1. window():
    窗口时长:每次计算的时间范围(采集周期倍数)
    滑动步长:多久计算一次(每次滑动计算一次,默认每个采集周期计算一次)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))

val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))

// 窗口的范围长度应该是采集周期的整数倍
// 窗口可以滑动的,但是默认情况下,一个采集周期进行滑动,可能会出现重复数据的计算
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6))
// 为了避免重复数据,可以改变滑动的滑动(步长)
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))


val wordToCount = windowDS.reduceByKey(_+_)
wordToCount.print()

ssc.start()
ssc.awaitTermination()

countBywindow():统计窗口内元素数量
reduceByKeyAndWindow():当窗口范围比较大,但是滑动幅度比较小,可以采用增加数据和删除数据的方式,防止重复计算,提升性能。
......

8. DStream输出操作

惰性求值,需要行动算子触发(print等)
foreachRDD():底层rdd操作

9. 关闭Spark Stream程序:

ssc.start()

// 如果想要关闭采集器,那么需要创建新的线程
// 而且需要在第三方程序中增加关闭状态
new Thread(
    new Runnable {
        override def run(): Unit = {
            while ( true ) {
                if (SomeThing) {//通过外部系统判定是否需要关闭
                    // 获取SparkStreaming状态
                    val state: StreamingContextState = ssc.getState()
                    if ( state == StreamingContextState.ACTIVE ) {
                        ssc.stop(true, true)//延迟关闭
                    }
                    System.exit(0)
                }
                Thread.sleep(5000)
            }
            
        }
    }
).start()

ssc.awaitTermination() // block 阻塞main线程

10. 关闭后恢复Spark Stream

从之前的checkpoint恢复数据或者重新创建:

val ssc = StreamingContext.getActiveOrCreate("cp", ()=>{
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    wordToOne.print()

    ssc
})

ssc.checkpoint("cp")
上一篇下一篇

猜你喜欢

热点阅读