spark checkpoint 分析及用法

2020-12-28  本文已影响0人  走在成长的道路上

Spark Streamingcheckpoint 机制

Spark Streaming 若需要 7 * 24 不间断的运行,必须对诸如系统错误, JVM 错误等程序逻辑无关的错误 (Failures) 导致 Driver 所在
的节点错误,具备一定的非应用程序出错的容错性。Spark Streamingcheckpoint 机制便是为此设计,它将足够多的信息 checkpoint
到某些具备容错性的存储系统如 hdfs 上,以便出错时能迅速恢复。

有两种数据可以进行 checkpoint 保存:

即,Metadata Checkpoint 主要时从 driver 失败中恢复,而 Data Checkpoint 用于对有状态的操作进行 Checkpoint 处理。

使用 Checkpoint

RDDcheckpoint 一样,需要从 SparkStreaming 入口设置检查点,即通过 StreamingContext.checkpoint(...) 设置 checkpoint
缓存路径,一般是缓存到 hdfs 上,同时, checkpoint 还需要满足两个条件:

因此,在创建 StreamingContext 是需要使用另一种方案 StreamingContext.getOrCreate(), 其中 getOrCreate() 源码如下:

def getOrCreate(
  checkpointPath: String,
  creatingFunc: () => StreamingContext,
  hadoopConf: Configuration = SparkHadoopUtil.get.conf,
  createOnError: Boolean = false
): StreamingContext = {
// 读取 checkpoint 数据
val checkpointOption = CheckpointReader.read(
  checkpointPath, new SparkConf(), hadoopConf, createOnError)
// 创建 StreamingContext 函数,不存在则需要使用 creatingFunc 函数创建新实例
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}

实例如下:

// 创建 SparkConf 属性配置内容
private val conf: SparkConf = new SparkConf()
                            .setMaster("local[*]")
                            .setAppName("SparkStreaming")
                            // 优雅的关闭程序,保证driver结束前处理完所有已接收的数据
                            .set("spark.streaming.stopGracefullyOnShutdown", "true")
private val path = "checkpoint"

// 创建测试用例
def testCheckpoint(): Unit = {
    // 获取或创建 StreamingContext 实例
    // 必须将业务逻辑写在 `creatingFunc` 中,并返回 `StreamingContext`,
    // 若定义在外面,恢复操作意味着重新创建了重新创建一次 `StreamingContext` 
    // 因此会报异常 StateDStream@333398f has not been initialized
    val ssc = StreamingContext.getOrCreate(path, () => {
        // 创建 StreamingContext 新实例
        val ssc = new StreamingContext(conf, Seconds(5))
        // 设置 checkpoint 存储路径
        ssc.checkpoint(path)
        // 业务逻辑
        val wc = ssc.socketTextStream("127.0.0.1", 9999)
        .flatMap(_.split(" "))
        .map((_, 1))
        .updateStateByKey[Int]((seq: Seq[Int], op: Option[Int]) => Option(seq.sum + op.getOrElse(0)))
        // 设置检查点时间
        wc.checkpoint(Seconds(50))
        wc.print()
        ssc
    })
    // 启动 Streaming 逻辑
    ssc.start()
    ssc.awaitTermination()
}

在上述测试中,使用 nc -lk 9999 即可开启一个简易的 socket 服务器端,然后往控制台刷数据即可

必须将业务逻辑写在 creatingFunc 中,并返回 StreamingContext,若定义在外面,恢复操作意味着重新创建了重新创建一次 StreamingContext
org.apache.spark.streaming.dstream.StateDStream@333398f has not been initialized

UpdateStateByKey(基于磁盘读写)MapWithState(基于磁盘存储+缓存)

Spark Streaming 中状态管理函数包括updateStateBykey和mpaWithState,都是用来统计全局key的状态的变化的。它们以DStream中的
数据进行按key做reduce操作,然后对各个批次的数据进行累加,在有新的数据信息进入或更新时。

  1. MapWithState 使用方式
  1. UpdateStateByKey 使用方式
  1. 区别

Kafka Checkpoint 实现分析

sparkstreaming 中对 kafka 数据 offset 相关的内容直接能保存到 checkpoint 数据中,这样当 streaming 任务非程序 bug 出现异常宕之后,
可以通过 checkpoint 数据进行恢复状态

private[spark] class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
  
  ...省略...
  
  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
    logError("Kafka ConsumerRecord is not serializable. " +
      "Use .map to extract fields before calling .persist or .window")
    super.persist(newLevel)
  }

  ...省略...
  // 从实现了 DirectKafkaInputDStreamCheckpointData 来保存数据
  protected[streaming] override val checkpointData =
    new DirectKafkaInputDStreamCheckpointData

  // 定义 DStreamCheckpointData 数据内容
  private[streaming]
  class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {

  ...省略...

    override def update(time: Time): Unit = {
      batchForTime.clear()
      generatedRDDs.foreach { kv =>
        // 将 KafkaRDD 中的 offsetRanges 记录到 generateRDD checkpoint 中
        val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
        batchForTime += kv._1 -> a
      }
    }

  ...省略...
  
    override def restore(): Unit = {
      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
        // 将 generateRDD checkpoint 中的 offsetRanges 记录恢复为 KafkaRDD 实例
         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
         generatedRDDs += t -> new KafkaRDD[K, V](
           context.sparkContext,
           executorKafkaParams,
           b.map(OffsetRange(_)),
           getPreferredHosts,
           // during restore, it's possible same partition will be consumed from multiple
           // threads, so do not use cache.
           false
         )
      }
    }
  }

}

spark.cleaner.referenceTracking.cleanCheckpoints=true 可以定期删除过期的 checkpoint 文件夹

checkpoint 文件夹清理工作在 ContextCleaner 类中进行实现的。具体如下:

  /**
   * Clean up checkpoint files written to a reliable storage.
   * Locally checkpointed files are cleaned up separately through RDD cleanups.
   */
  def doCleanCheckpoint(rddId: Int): Unit = {
    try {
      logDebug("Cleaning rdd checkpoint data " + rddId)
      ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
      listeners.asScala.foreach(_.checkpointCleaned(rddId))
      logInfo("Cleaned rdd checkpoint data " + rddId)
    }
    catch {
      case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
    }
  }
上一篇下一篇

猜你喜欢

热点阅读