Spark源码精读分析计划

Spark Core源码精读计划#17:上下文清理器Contex

2019-06-06  本文已影响138人  LittleMagic

目录

前言

话休絮烦,本文讲解SparkContext初始化的最后一个组件——ContextCleaner,即上下文清理器。顾名思义,它扮演着Spark Core中垃圾收集器的角色,因此虽然我们在平时编码时甚少见到它,但它算是一个幕后英雄了。如果看官对Java GC的相关知识有所了解的话,本篇讲的内容应该容易理解。

初始化与类定义

SparkContext中的初始化逻辑

代码#17.1 - SparkContext构造方法中初始化ContextCleaner

    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

ContextCleaner的初始化非常简单,只需要依赖于SparkContext本身,由spark.cleaner.referenceTracking配置项控制是否启用,默认为true。

ContextCleaner类的属性成员

代码#17.2 - ContextCleaner类的属性成员

  private val referenceBuffer =
    Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)

  private val referenceQueue = new ReferenceQueue[AnyRef]

  private val listeners = new ConcurrentLinkedQueue[CleanerListener]()

  private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

  private val periodicGCService: ScheduledExecutorService =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")

  private val periodicGCInterval =
    sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")

  private val blockOnCleanupTasks = sc.conf.getBoolean(
    "spark.cleaner.referenceTracking.blocking", true)

  private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
    "spark.cleaner.referenceTracking.blocking.shuffle", false)

  @volatile private var stopped = false

  private def blockManagerMaster = sc.env.blockManager.master
  private def broadcastManager = sc.env.broadcastManager
  private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

剩余的三个则分别是从SparkEnv中获取的BlockManagerMaster、BroadcastManager与MapOutputTrackerMaster的对应实例,它们会在之后的清理步骤中用到。

清理任务及弱引用的封装

ContextCleaner中共有5种清理任务,分别对应RDD、Shuffle、广播变量、累加器和检查点,都继承自CleanupTask这个空的特征。它们的定义极其简单,如下。

代码#17.3 - o.a.s.CleanupTask及其子类

private sealed trait CleanupTask
private case class CleanRDD(rddId: Int) extends CleanupTask
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
private case class CleanAccum(accId: Long) extends CleanupTask
private case class CleanCheckpoint(rddId: Int) extends CleanupTask

在上一节讲到的CleanupTaskWeakReference定义如下。当其中的referent对象可达性变为弱可达(weakly reachable)时,对应的CleanupTaskWeakReference实例就会被加入ReferenceQueue中,用于执行清理任务。

代码#17.4 - o.a.s.CleanupTaskWeakReference类

private class CleanupTaskWeakReference(
    val task: CleanupTask,
    referent: AnyRef,
    referenceQueue: ReferenceQueue[AnyRef])
  extends WeakReference(referent, referenceQueue)

ContextCleaner的执行流程

启动

在代码#17.1中已经调用了ContextCleaner.start()方法。该方法将清理线程cleaningThread设为守护线程并启动之,然后按照periodicGCInterval的间隔来调度执行System.gc()方法,进而可能触发一次GC。因此,在Spark Application中指定Driver或Executor的JVM参数时,一定不要加上-XX:-DisableExplicitGC,该参数会使System.gc()的调用无效化。

代码#17.5 - o.a.s.ContextCleaner.start()方法

  def start(): Unit = {
    cleaningThread.setDaemon(true)
    cleaningThread.setName("Spark Context Cleaner")
    cleaningThread.start()
    periodicGCService.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = System.gc()
    }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  }

清理逻辑

ContextCleaner提供了registerForCleanup()方法,用来将CleanupTask及其对应要清理的对象加入referenceBuffer集合中。下面来看代码#17.2中提到的keepCleaning()方法。

代码#17.6 - o.a.s.ContextCleaner.keepCleaning()方法

  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
        synchronized {
          reference.foreach { ref =>
            logDebug("Got cleaning task " + ref.task)
            referenceBuffer.remove(ref)
            ref.task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

该方法从ReferenceQueue中取出CleanupTaskWeakReference,然后将其包含的CleanupTask进行模式匹配,并对五种情况分别调用不同的方法。以清理RDD和Shuffle数据的方法为例来看一看。

代码#17.7 - o.a.s.ContextCleaner.doCleanupRDD()/doCleanupShuffle()方法

  def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning RDD " + rddId)
      sc.unpersistRDD(rddId, blocking)
      listeners.asScala.foreach(_.rddCleaned(rddId))
      logInfo("Cleaned RDD " + rddId)
    } catch {
      case e: Exception => logError("Error cleaning RDD " + rddId, e)
    }
  }

  def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning shuffle " + shuffleId)
      mapOutputTrackerMaster.unregisterShuffle(shuffleId)
      blockManagerMaster.removeShuffle(shuffleId, blocking)
      listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
      logInfo("Cleaned shuffle " + shuffleId)
    } catch {
      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
    }
  }

可见,清理RDD是调用了SparkContext.unpersistRDD()方法来反持久化一个RDD。清理Shuffle则需要同时从MapOutputTracker与BlockManager中反注册Shuffle。清理完毕后再调用各个监听器的监听方法进行记录。

总结

本文简要介绍了ContextCleaner的初始化、启动和清理的具体流程。

在讲完ContextCleaner之后,围绕SparkContext展开的这部分体系也进入了尾声。我们会检查一下前面是否还有漏掉的重要内容,如果没有的话,大概是时候进入Spark Core的核心之一——RDD了。

上一篇下一篇

猜你喜欢

热点阅读