SparkStreaming入门教程(四)有状态和无状态的转化操

2018-02-23  本文已影响0人  胖滚猪学编程

本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!

Dstream的转化操作分为无状态的(stateless)和有状态的(stateful)
无状态转化:每个批次处理都不依赖于先前批次的数据,如map() filter() reduceByKey()等均属于无状态的
有状态转化:依赖之前的批次数据或者中间结果来计算当前批次的数据,包括updateStatebyKey()window()

有状态转换操作


有状态操作是跨时间区间跟踪处理数据的操作。依赖于之前批次的数据。

Window Operations

概念

之前我们说过,我们会为程序设置一个批次时间间隔,如Seconds(2),那么窗口操作,简单理解就是整合多个批次,计算出整个窗口的结果,因此说有状态操作是跨时间区间的。


Window Operations

假设时间间隔为10s,上面这幅图的意思就是,每隔20s(2个批次间隔),就对前30s(3个批次间隔)的数据进行整合计算[time1+time2+time3]。
由此可知,一般的window操作会涉及两个参数。

常用API
scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@3f908a10

scala> val lines = ssc.socketTextStream("localhost", 7788)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@7d619bcb

scala> val pairs = lines.flatMap(_.split(" ")).map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@184b24d4

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@4bd1541f

scala> wordCounts.print()

scala> val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(15), Seconds(5))
windowedWordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@752957da

scala> windowedWordCounts.print()

scala> ssc.start()
scala> ssc.awaitTermination()
UpdateStateByKey Operation

该updateStateByKey操作允许你保持任意状态,同时不断用新信息更新它。
之前我们写过的wordcount程序,我们应该发现了:单词数统计是按批次的,批次与批次互不影响,当你不断输入某个单词的时候,总的计数不会累加。那么如果想要在任意状况下都去更新某个信息,就需要使用UpdateStateByKey ,使用此功能必须执行两个步骤。

定义状态 - 状态可以是任意数据类型。
定义状态更新函数 - 使用函数指定如何更新状态。

def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ? Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean)(implicit arg0: ClassTag[S]): DStream[(K, S)]
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of each key. org.apache.spark.Partitioner is used to control the partitioning of each RDD.

  • S:State type
  • updateFunc:State update function. Note, that this function may generate a different tuple with a different key than the input key. Therefore keys may be removed or added in this way. It is up to the developer to decide whether to remember the partitioner despite the key being changed.
  • partitioner:Partitioner for controlling the partitioning of each RDD in the new DStream
  • rememberPartitioner:Whether to remember the partitioner object in the generated RDDs.
scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@3f908a10

scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@f500481

// * String : 单词  * Seq[Int] :单词在当前批次出现的次数  * Option[Int] : 历史批次出现的次数
scala> val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}}
updateFunc: Iterator[(String, Seq[Int], Option[Int])] => Iterator[(String, Int)] = <function1>

scala> val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
results: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.StateDStream@51e0629a

scala> results.print()

scala> ssc.checkpoint("hdfs://master/user/checkpoint")//必须设置检查点机制 不然会报错

scala> ssc.start()

scala> ssc.awaitTermination()
-------------------------------------------
Time: 1519396875000 ms
-------------------------------------------

UpdateStateByKey用处:统计广告点击流量,统计这一天的车流量。

【注意:某些有状态操作(UpdateStateByKey、reduceByKeyAndWindow)需要使用检查点机制,将在后面详细介绍】

无状态转化操作


和RDD的转化操作很类似,DStreams支持Spark RDD中的许多转换

Stream-stream joins

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
join操作演示.gif

Stream-dataset joins
join方法作用是两个DStream,如果参数不是DStream,那就使用上面的transform 方法

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

最后需要注意的是: 我们知道Dstream是一系列的RDD序列流,由许多RDD(批次)组成。无状态转化操作是分别应用在每个RDD(批次)上的,例如,reduceByKey会归约每个时间区间的数据,但不同区间数据是不会归约的。

上一篇下一篇

猜你喜欢

热点阅读