一口气搞定系列-SparkStreaming(流式计算框架)

2023-02-08  本文已影响0人  CoderInsight

1.流式计算简介

DStream 是逻辑上的概念;
DStream 是一堆的 RDD 组成;
Spark Streaming 只是秒级的运算(因为是通过在一定时间段内进行分割数据流来实现的),所以其采用的小批处理的方式去处理数据,使得其可以兼容批量和实时数据处理的逻辑和算法,因此他比较方便一些需要历史数据和实时数据联合分析的特定应用场合。

2.SparkStreaming简介

Spark Streaming

3.复习Spark 运行架构

在Spark中,一个应用( Application )由一个任务控制节点( Driver )和若干个作业
(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成.

当执行一个应用时,任务控制节点会向集群管理器( Cluster Manager )申请资源,启动
Executor ,并向Executor发送应用程序代码和文件,然后在Executor,上执行task。

4.SparkStreaming架构流程

SaprkStreaming 工作原理

  • 在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上。
  • 每个Receiver都会负责一个 inputDStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)。
  • Spark Streaming通过inputDStream与外部数据源进行连接,读取相关数据。

3.Dstream 操作概述

1.什么是DStream

image Spark Streaming

2.Spark Streaming 程序基本步骤

streamingContext 我们在创建对象的时候一般缩写为 ssc

3.创建 StreamingContext 对象

import org.apache.spark._
import org.apache.spark.streaming._
// 生成以Spark配置文件对象,并设置其是运行在本地,并启动2个线程
val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
// 通过SparkConf对象作为参数生成一个Spark Streaming对象,并设置每1秒切分一次
val ssc = new StreamingContext(conf, Seconds(1))

4.输入源

  • 基本输入源
    • 文件流
    • 套接字流
    • RDD队列流
  • 高级数据源
    • Apache Kafka
    • Apache Flume

(1),基本数据源

1).文件流

比如对 日志文件 的处理

mkdir /root/sparkTest
cd /root/sparkTest
mkdir streaming
cd streaming
mkdir logFile
cd logFile
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(8))
scala> val lines = ssc.textFileStream("file:///root/sparkTest/streaming/logFile")
scala> val words = lines.flatMap(_.split(" "))
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 
scala> wordCounts.print() 
scala> ssc.start()
/*
scala> ssc.start() //实际上,当你输入这行回车后,Spark Streaming就开始进行循环监听,
下面的ssc.awaitTermination()是无法输入到屏幕上的,但是,为了程序完整性,这里还是给出ssc.awaitTermination()
*/
scala> ssc.awaitTermination()
import org.apache.spark._ 
import org.apache.spark.streaming._
object WordCountStreaming {  
  def main(args: Array[String]) {  
    val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//设置为本地运行模式,2个线程,一个监听,另一个处理数据    
    val ssc = new StreamingContext(sparkConf, Seconds(2))// 时间间隔为2秒
    // lines 就是我们得到的一堆的 RDD
    val lines = ssc.textFileStream("file:///root/sparkTest/streaming/logFile")  //这里采用本地文件,当然你也可以采用HDFS文件
    val words = lines.flatMap(_.split(" "))  
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)  
    wordCounts.print()  
    ssc.start()  
    ssc.awaitTermination()  
  }  
} 
[root@master logFile]# vi log1
# 文件中的内容如下
[root@master logFile]# cat log1 
I love spark.
Time: 1584240984000 ms
-------------------------------------------
(love,1)
(spark.,1)
(I,1)

2).套接字流

import org.apache.spark.internal.Logging 
import org.apache.log4j.{Level, Logger}

/** 
    Utility functions for Spark Streaming examples.
    设置日志的级别,保证我们在之后的代码中打印输出内容时,可以展示!
*/
object StreamingExamples extends Logging {
  /** 
  Set reasonable logging levels for streaming if the user has not configured log4j.
  */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the 
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      // logging level.
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {
  def main(args: Array[String]) {
    /*
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }
    */
    val hostname="localhost"
    val port="1234"
      
    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    // 设置每隔2秒去创建一个StreamingContext;也就是隔2秒截一段,在当前段中对数据进行操作(这里是计数)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val lines = ssc.socketTextStream(hostname, port.toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
# 新建一个终端,使用natcat的命令,建立TCP流与指定端口进行对接
nc -lk 1234
# 输入以下内容,测试词频统计
emmm
emmm
emmmm
emmsss
sda
sa
da
# 此时统计结果,是在指定分割段中进行的单词计数,然后这个时候并不是对所有的阶段进行计数,那么在之后的有状态操作中会使用另外一个函数去对其进行计数操作

3).RDD队列流

此时我们是采用自己源源不断的创建RDD的形式去实现的。

新建一个TestRDDQueueStream.scala代码文件,功能是:每隔1秒创建一个RDD,Streaming 每隔2秒就对数据进行处理


import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object QueueStream {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
    // 创建StreamContext对象,设置每隔两秒钟进行一次数据的获取
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    // 新生成一个队列,生成的值都是整型的值
    val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
    
    // 指定数据源是RDD队列,完成数据源的挂接
    val queueStream = ssc.queueStream(rddQueue)
    // 队列的处理: 
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
    ssc.start() // 启动监听
    
    // 向队列中添加数据
    for (i <- 1 to 10){
        // 通过SparkContext 去创建RDD(makeRDD方法 与 parallelize方法类似)
        rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
        Thread.sleep(1000)
        }
    ssc.stop()
  }
}

5.DStream的转换操作

(1).DStream的无状态转换操作★★★

由于DStream是由很多个RDD组成,所以说其中的一些操作在某些情况下金和RDD的相关操作是一样的。

1).map(func)

对源DStream的每个元素,采用 func 函数进行转换,得到一个新的DStream。

2).flatMap(func)

与map类似,是将DStream中的元素拍扁,然后被用做映射为0个或者多个输出项。

3).repartition(numPartition)

通过创建更多或者更少的分区,改变DStream的并行程度;也就是说重新设置分区的个数。

4).count()

统计源DStream中每个RDD元素的数量。

5).filter(func)

返回一个新的DStream,仅包含源DStream中满足函数 func 的项

6).reduce(func)

利用func函数聚集源DStream中的每个RDD元素,返回一个包含单元素RDDs的新DStream。

7).union(otherStream)

返回一个新的DStream,包含源DStream和其他DStream的元素。

8).countByValue()

先对DStream中的所有元素进行操作,返回一个 (K,V) 键值类型的新DStream,每个键的值是在原DStream的每个RDD中出现次数。

9).reduceByKey(func, [numTasks])

当一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新组成的DStream。每一个key的值均由给定的reduce函数(func)聚集起来。

10).join(otherStream, [numTasks])

当应用在两个DStream(一个包含(K,V)键值对,另一个包含(K,W)键值对),则返回一个包含(K,(V,W))的键值对的新DStream。

11).transform(func)

通过对源DStream的每一个RDD应有 RDD-To-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

(2).DStream有状态转换操作

一些基于窗口转换操作的含义

1).window(windowLength, slideInterval) ★★

基于源DStream产生的窗口化的批数据,计算得到一个新的DStream

2).countByWindow(windowLength, slideInterval)

返回流中元素的一个滑动窗口数

3).reduceByWindow(func, windowLength, slideInterval)

返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算。

4).reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数

5).reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)

6).countByValueAndWindow(windowLength, slideInterval, [numTasks])★★

当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率

7),updateStateByKey(func) ★★★

需要在跨批次之间维护状态时,就必须使用updateStateByKey操作。
实例:对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果。

6.输出操作

(1).把DStream输出到文本文件中

1).代码

import org.apache.spark.internal.Logging 
import org.apache.log4j.{Level, Logger}


/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel


object NetworkWordCountStateful {
  def main(args: Array[String]) {
    /*
    定义状态更新函数
    1.当调用 updateStateByKey 函数的时候,会自动提供两个参数:针对每一个key产生的值列表 和 当前值的状态(是否之前统计过?)
    2.Option 类似于一个boolean类型的值,要么有,要么没有,
    3.foldLeft(number): 把值的序列从下标为0的开始进行操作
    4.getOrElse(defaultValue):判断是否,默认值是0
    5.最后统计结果必须返回到some类型的对象中
    */
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0) 
      Some(currentCount + previousCount)
    }
    
    StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别    

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 1234)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    
    /*
    updateStateByKey: 在需要跨批次之间的状态维护时是需要这个函数的;会保留历史状态,将历史状态和当前新增加状态进行相加
    1.对于有状态操作而言,本批次的词频统计会和之前批次的词频统计结果的基础上进行不断累加,所以最终得到的词频统计是所有单词的总的词频统计的结果
    2.通过DStream调用updateStateByKey()方法,然后传入自定义函数
    */
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    // 下面是新增的语句,把DStream保存到文本文件中
    // 注意此时生成的是一个目录文件夹(streaming),然后在文件夹中中每一个小片段以目录结构中最后一个名称为文件头-XXXX的形式来保存(output-1584783640000)
    stateDstream.saveAsTextFiles("file:///root/spark/mycode/streaming/output")
    
    // 启动sparkSteam程序(流计算)
    sc.start()
    // 等待终端流计算程序
    sc.awaitTermination()
  }
}

2).输入数据测试代码

// 1.打包运行 NetworkWordCountStateful.scala 程序 
// 2.另外打开一个终端给 NetworkWordCountStateful.scala进行传递单词,然后可以使其进行单词统计
// 3.此时是使用的 natcat 可用于创建 TCP/IP 连接,最大的用途就是用来处理 TCP/UDP 套接字,可以直接使用yum install -y nc 安装。
// 4.通过 -l 参数开启监听模式;使用-k参数强制服务器保持连接持续监听端口
$ nc -lk 1234
//请手动输入一些单词,可以随便输入,比如下面是笔者输入的单词
emm
emmm
emm
emmm
emmmmm

(2).把DStream写入到MySQL数据库中

1).前期数据库准备操作

$ service mysql start
$ mysql -u root -p
-- 屏幕会提示你输入密码
mysql> create database spark;
mysql> use spark;
mysql> create table wordcount (word char(20), count int(4));

2).实际代码操作

// 对数据库的操作都是采用的java的jdbc形式去写的,所以这里引入的是java的jar包;注意此时要将java的连接驱动考入到spark的jars目录下
import java.sql.{PreparedStatement, Connection, DriverManager}

import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel


object NetworkWordCountStateful {
  def main(args: Array[String]) {
       /*
        定义状态更新函数
        1.当调用 updateStateByKey 函数的时候,会自动提供两个参数:针对每一个key产生的值列表 和 当前值的状态(是否之前统计过?)
        2.Option 类似于一个boolean类型的值,要么有,要么没有,
        3.foldLeft(number): 把值的序列从下标为0的开始进行操作
        4.getOrElse(defaultValue):判断是否,默认值是0
        5.最后统计结果必须返回到some类型的对象中
    */
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    
    StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
    
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 1234)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print() // 打印在屏幕中显示

//下面是新增的语句,把DStream保存到MySQL数据库中 
      /*
      1.由于一个DStream是一堆的RDD的集合,所以我们如果要对其中的每一个RDD进行操作,那么就可以使用foreachRDD
      2.对RDD中的元素重新分区
      3.对重新分区之后的对象,再去遍历分区中的所有RDD,再进行相应操作,也就是说执行我们自定义的处理函数
      4.我们自定义的处理函数,作用就是将分区中的RDD元素保存到数据库中
      5.此时RDD中的元素是<K,V>的形式,也就是pairRDD;然后有多个pairRDD构成迭代器,然后传递倒自定义函数中
      */
     stateDstream.foreachRDD(rdd => {
      /*
      内部函数(自定义 func 函数)
      1.前边传入过来的是结果有很多,然后是以迭代器的形式传递过来的;然后还要指定迭代器中的数据类型
      2.
      */
      def func(records: Iterator[(String,Int)]) {
        // 数据库连接对象
        var conn: Connection = null
        // 预编译对象
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/spark"
          val user = "root"
          val password = "123456"  // 数据库密码是123456
          conn = DriverManager.getConnection(url, user, password)
          // 遍历迭代器中的pairRDD( K,V 类型的RDD)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, p._1.trim)
            stmt.setInt(2,p._2.toInt)
            // 执行sql语句
            stmt.executeUpdate()
          })
} catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      } // 自定义func函数结束
      // 对每一个RDD进行重新分区
      val repartitionedRDD = rdd.repartition(3)
      // 对重新分区之后的RDD,再遍历每一个分区,然后对分区中的每一个RDD再进行相应的操作(这里是直接传入我们自定义的函数,其函数的内容是将RDD中的元素进行一番操作之后打印在屏幕上或者保存到数据库中)
      repartitionedRDD.foreachPartition(func)
    }) // 遍历DStream中的RDD结束
    
    sc.start()
    sc.awaitTermination()
  }
}
上一篇 下一篇

猜你喜欢

热点阅读