Spark Tour

[译]Spark Streaming编程指南(四)

2017-07-10  本文已影响85人  steanxy

缓存/持久化

和RDD类似,DStream允许开发者将流数据持久化到内存。使用在DStream上使用persist()方法会自动持久化DStream中的每个EDD到内存中。这对于DStream需要计算多次的情况非常有用(如在相同数据上进行多个操作)。对于window-based操作(如reduceByWindowreduceByKeyAndWindow)和state-based操作(如updateStateByKey),会隐式地进行持久化,因此,通过window-based操作生成的DStream会自动持久化到内存,不需要开发者调用persist()

对于通过网络接收的输入数据流(如Kafka,Flume,socket等),默认持久化级别设置为在两个节点中保存副本,以便进行容错处理。

注意,不像RDD,DStream的偶人持久化级别会将数据序列化保存到内存中。在之后性能调优中会进一步讨论,更多关于持久化级别的信息参见Spark编程指南(二)

检查点

streaming应用程序必须7*24小时运行,因此必须对于应用程序逻辑无关的错误具有弹性(如系统错误,JVM崩溃等)。Spark Streaming需要将足够的检查点信息放到容错存储系统,以便之后从错误中恢复,有两种类型的数据需要设置检查点。

总结来说,元数据检查点用于从驱动程序的错误中恢复,如果使用了带状态的转换,数据或RDD检查点是必要的。

何时启用检查点
以下几种情况必须为应用程序启用检查点:

注意,没有使用带状态转换的简单streaming应用程序可以不启用检查点。在这种情况下,从驱动程序错误中恢复也是局部的(一些已经接收但是没有处理的数据可能会丢失)。这通常是可接受的,很多Spark Streaming应用程序用这种方式运行。对非Hadoop环境的支持会在未来进行改善。

如何配置检查点
启用检查点,需要设置一个容错可靠的文件系统(如HDFS,S3等)中的目录,用于存储检查点信息。使用streamingContext.checkpoint(checkpointDirectory)进行设置。这样就可以使用带状态的转换了。另外,如果想要让应用程序从驱动程序的错误中恢复,需要重写streaming应用程序包含以下行为。

这些行为可使用StreamingContext.getOrCreate完成,如下。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory存在,那么上下文会通过检查见数据重新创建。如果目录不存在(如第一次运行),那么函数functionToCreateContext会创建新的上下文和DStream。可以参考示例RecoverableNetworkWordCount。这个示例程序会像一个文件追加网络数据的单词计数。

除了使用getOrCreate之外,还需要保证驱动程序在出现错误时会自动重新启动。这只能通过应用程序的部署方式来完成。之后会进一步讨论。

注意,RDD的检查点会带来存储到可靠存储系统的成本,这会导致需要RDD检查点的批次处理时间增加。因此,检查点的时间间隔需要小心设置。对于比较小的批次,每个批次的检查点可能会明显减少操作的吞吐量。相反,检查点太少会导致任务规模的增加,带来不利影响。对于需要RDD检查点的带状态转换,默认时间间隔是批时间间隔的倍数,不少于10s。可通过使用dstream.checkpoint(checkpointInterval)进行设置。通常,检查点时间间隔为5-10个DStream时间间隔是比较好的。

累加器,广播变量和检查点

累加器广播变量不能从Spark Streaming的检查点中恢复。如果启用了检查点并且同时使用了累加器广播变量,必须为累加器广播变量创建懒实例化的单例,以便在驱动程序从失败中重新启动后可以重新实例化它们。下面是个示例。

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

完整代码参见source code

部署应用程序

这一节讨论部署Spark Streaming应用程序的步骤。

要求
要运行Spark Streaming应用程序,需要以下几点。

升级应用程序代码
如果正在运行的Spark Streaming应用程序需要升级到新的应用程序代码,有两种可行的机制。

监控应用程序

除了Spark的监控功能,Spark Streaming还有特定的监控功能。当使用StreamingContext时,Spark web UI有一个额外的Streaming tab页,显示关于正在运行的receiver的统计数据(receiver是否活跃,接收到记录的数量,receiver错误等)以及完成的批次信息(批次处理时间,队列延迟等)。这些都可以监控streaming应用程序的进程。

web UI中下面两个指标尤为重要:

如果批处理时间总是大于批时间间隔和/或排队延迟持续增加,则说明系统不能快速处理这些批次,正在落后。在这种情况下,要考虑减少批处理时间。

Spark Streaming的进程也可以使用StreamingListener接口进行监控,这个接口允许获取receiver的状态以及处理时间。注意这是一个开发者API,未来会改进(报告更多的信息)。

上一篇 下一篇

猜你喜欢

热点阅读