Spark-DStream数据转换

2020-07-07  本文已影响0人  布莱安托

DStream的原语与RDD类似,分文转换(Transformation)和输出(Output)两种,此外还有一些特殊的原语,如:updateStateByKey,transform以及各种窗口(window)相关的原语。

无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。支持map、flatMap、filter、reduceByKey等RDD相同的操作。

注:如针对键值对的DStream使用reduceByKey需要添加import StreamingContext._

无状态转化应用到DStream内部各个批次的RDD上,但是只会对单个时间周期的数据进行操作,并不会跨越时间周期。

无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间周期内。键值对DStream可以使用和RDD一样的连接相关操作,例如:cogroup、join、leftOutJoin等。

同样,可以使用union将两个DStream内容合并起来,也可以使用StreamingContext.union()合并多个流。

有状态转化操作

1. updateStateByKey

updateStateByKey用于记录历史记录,有事我们需要在DStream中跨批次维护状态。针对这种情况updateStateByKey为我们提供了一个对状态变量的访问,用于键值对的DStream。给定一个键值对类型的DStream,并传递一个按照键来更新值得函数,以此来构建出一个新的DStream,内部数据的形式为(原有键,状态)的一个键值对。

想要维护并更新状态需要做如下两步:

  1. 定义状态,可以为任意数据类型
  2. 定义状态更新函数,函数需要实现当前批次输入与当前状态的更新操作

使用updateStateByKey需要设置checkpoint,会通过checkpoint保存状态。

实现有状态的Wordcount如下:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object UpdateStateDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[4]").setAppName("UpdateStateDemo")
    val streamingContext = new StreamingContext(conf, Seconds(5))
      
    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("test")

    val kafkaDStream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    val flatMapDStream = kafkaDStream.flatMap(_.key().split(" "))

    val mapDStream = flatMapDStream.map((_, 1))

    // 将原有状态值与新增数据value序列相加,获得新的状态
    val updateStateDStream = mapDStream.updateStateByKey {
      case (seq, state) => Option(state.getOrElse(0) + seq.sum)
    }

    updateStateDStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()

  }
}

2. 窗口操作

窗口操作可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

注:所有基于窗口的操作均需要两个参数:1)窗口大小,2)滑动步长,两者都必须是StreamingContext中设置的采集周期的正整数倍。

窗口大小控制每次计算最近的多少个批次的数据,计算的批次数量为windowDuration/batchInterval个。滑动步长用来控制对新的DStream进行计算的间隔,默认与批次间隔相同。

关于窗口的操作有如下原语:

  1. window(windowLength, slideInterval):对DStream的窗口中的批次进行计算,并返回一个新的DStream

    利用window对窗口中的批次数据进行Wordcount

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object WindowDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[4]").setAppName("WindowDemo")
        val streamingContext = new StreamingContext(conf, Seconds(3))
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "use_a_separate_group_id_for_each_stream",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("test")
    
        val kafkaDStream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    
        // 窗口大小及滑动步长均为采集周期的整数倍
        val windowDStream = kafkaDStream.window(Seconds(9), Seconds(3))
    
        val flatMapDStream = windowDStream.flatMap(_.key().split(" "))
    
        val mapDStream = flatMapDStream.map((_, 1))
    
        val reduceByKeyDStream = mapDStream.reduceByKey(_ + _)
    
        reduceByKeyDStream.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
    
      }
    }
    
  1. countByWindow(windowLength, slideInterval):返回一个DStream窗口计算的元素数量

  2. reduceByWindow(func, windowLength, slideInterval):通过自定义函数对DStream窗口进行聚合

  3. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):通过自定义函数对键值对类型的DStream的窗口进行reduceByKey操作。默认任务并行大小根据配置属性spark.default.parallelism做分组,也可以通过制定numTasks来设置并行度。

  4. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):与上面的函数相比,新增了参数invFunc,这个函数是用来将上一个窗口包含的,本次窗口不包含的批次数据结果从上一个窗口的计算结果去除,这样窗口间相同的批次不需要重复计算,只需要将新增数据和除去无效数据后的结果进行计算,即可得到新的结果,提高了计算的效率。注:使用时必须开启checkpoint

  5. countByValueAndWindow(windowLength, slideInterval, [numTasks]):对键值对类型的DStream进行处理,返回(K, Long)类型的DStream,其中键的值为其在窗口中出现的频次。

3. 其他重要操作

  1. transform
  2. join
上一篇下一篇

猜你喜欢

热点阅读