Spark编程基础(Scala版)——Spark Streami

2021-08-09  本文已影响0人  kaiker

1、流计算概述

关系数据库并不是为存储快速、连续到达的流数据而设计的,不支持连续处理

流计算的处理流程一般包含:数据实时采集、数据实时计算、数据实时查询

流计算数据处理流程

2、Spark Streaming

2.1 Spark Streaming设计

将实时输入数据流以时间片为单位进行拆分,然后采用Spark引擎以类似批处理的方式处理每个时间片数据

Spark Streaming

Spark Streaming最主要的抽象是离散化数据流(Discretized Stream, DStream)标识连续不断的数据流。Spark Streaming的输入数据按照时间片分成一段一段,每一段数据转换成RDD,DSteam的操作最终都被转变为相应的RDD操作

DStream操作

2.2 与Storm区别

Spark Streaming 无法实现毫秒级计算,因为分解为批处理后,产生多个Spark作用和,还要经过DAG规划、任务管理器,有一定开销

RDD数据级的容错更加高效

3、DStream操作概述

3.1 Spark Streaming工作机制

在Spark Streaming中,有一个Receiver,作为一个长期运行Task运行在Executor上,每个Receiver负责一个DStream输入流。Receiver收到数据后提交给Spark Streaming处理并对结果进行分发。

Spark Streaming

3.2 编写程序基本步骤

3.3 创建StreamingContext对象

    scala> import  org.apache.spark.streaming._
    scala> val  ssc = new StreamingContext(sc, Seconds(1))

4、基本输入源

4.1 文件源

    scala> import  org.apache.spark.streaming._
    scala> val  ssc = new StreamingContext(sc, Seconds(20))
    scala> val  lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
    scala> val  words = lines.flatMap(_.split(" "))
    scala> val  wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    scala> wordCounts.print()
    scala> ssc.start()
    scala> ssc.awaitTermination()

4.2 RDD队列流

import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object QueueStream {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
ssc.start()
for (i <- 1 to 10){
        rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
        Thread.sleep(1000)
    }
    ssc.stop()
  }
}

5、高级数据源

import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaWordCount{
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1"  //Topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group" 
val topics = "wordsender"  //topics的名称
val numThreads = 1  //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) // 这里用到了ssc
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}

6、转换操作

6.1 无状态转换

不记录历史状态就是无状态的

武装淘汰转换操作

6.2 有状态转换

滑动窗口

窗口操作
窗口操作 reduceByKeyAndWindow

updateStateByKey

updateStateByKey会对DStream中的数据根据key计算,然后对各个批次进行累加

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
      StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}
上一篇 下一篇

猜你喜欢

热点阅读