Spark源码:启动TaskScheduler

2019-12-17  本文已影响0人  Jorvi

源码目录


初始化 SparkContext 时,会创建TaskScheduler,现在来看看TaskScheduler 启动过程。

1 启动TaskScheduler

调用_taskScheduler.start()启动TaskScheduler。

  private val speculationScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
  1. backend.start()启动SchedulerBackend;
  2. 如果是 非Local模式 且 spark.speculation = true,即开启了推测机制,则定时启新线程执行checkSpeculatableTasks,检查可推测的Tasks。

2 启动SchedulerBackend

private[spark] class StandaloneSchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
  with StandaloneAppClientListener
  with Logging {

  override def start() {
    super.start()

    // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
    // mode. In cluster mode, the code that submits the application to the Master needs to connect
    // to the launcher instead.
    if (sc.deployMode == "client") {
      launcherBackend.connect()
    }

    // The endpoint for executors to talk to us
    val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val webUrl = sc.ui.map(_.webUrl).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    // If we're using dynamic allocation, set our initial executor limit to 0 for now.
    // ExecutorAllocationManager will send the real initial limit to the Master later.
    val initialExecutorLimit =
      if (Utils.isDynamicAllocationEnabled(conf)) {
        Some(0)
      } else {
        None
      }
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

}

StandaloneSchedulerBackend 继承了 CoarseGrainedSchedulerBackend。

  1. super.start() 调用父类 CoarseGrainedSchedulerBackend 的 start 方法;
  2. 配置各种参数:driverUrl、args、extraJavaOpts、classPathEntries、libraryPathEntries、javaOpts等,构建Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ......)
  3. 生成 ApplicationDescription,将 Command 加入到 ApplicationDescription 中,后面会使用到;
  4. 创建 StandaloneAppClient 并启动;
  5. 更新 app 状态为 SUBMITTED;
  6. 等待 app 注册并启动;
  7. 更新 app 状态为 RUNNING。

注:这边的 2、3两步和提交 Application 时启动 Driver 的过程很相似:

具体见Spark源码:提交Application到Spark集群

2.1 启动CoarseGrainedSchedulerBackend

  override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = createDriverEndpointRef(properties)
  }


  protected def createDriverEndpointRef(
      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }


  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
    new DriverEndpoint(rpcEnv, properties)
  }

创建 DriverEndpoint,以 “CoarseGrainedScheduler” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。

注:每次注册 RpcEndpoint 到 RpcEnv 上时,都会加入OnStart 到 Inbox 的队列中,因此必然要执行 RpcEndpoint.onStart() 方法。

来看看 DriverEndpoint.onStart 方法。

  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
    extends ThreadSafeRpcEndpoint with Logging {

    override def onStart() {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

      reviveThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(ReviveOffers))
        }
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

  }

该方法中会启一个新线程定时给自己发送 ReviveOffers 消息。

    override def receive: PartialFunction[Any, Unit] = {

      case ReviveOffers =>
        makeOffers()
    }


    // Make fake resource offers on all executors
    private def makeOffers() {
      // Make sure no executor is killed while some task is launching on it
      val taskDescs = withLock {
        // Filter out executors under killing
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort))
        }.toIndexedSeq
        scheduler.resourceOffers(workOffers)
      }
      if (!taskDescs.isEmpty) {
        launchTasks(taskDescs)
      }
    }

此方法会遍历 CoarseGrainedSchedulerBackend.executorDataMap,而此时 executorDataMap 中还没有任何东西,因此该方法等于啥也没干,等后面分析。

2.2 创建 StandaloneAppClient 并启动

private[spark] class StandaloneAppClient(
    rpcEnv: RpcEnv,
    masterUrls: Array[String],
    appDescription: ApplicationDescription,
    listener: StandaloneAppClientListener,
    conf: SparkConf)
  extends Logging {

  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

  // 省略部分内容
}

创建ClientEndpoint,并以 “AppClient” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。

    override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }


    private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }


    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }
  1. 遍历所有masterRpcAddresses;
  2. 根据 masterAddress 和 masterEndpointName 获取masterRpcEndpointRef;
  3. 利用 masterRpcEndpointRef 发送 RegisterApplication(ApplicationDescription, DriverRpcEndpointRef) 消息。

创建 StandaloneAppClient 并启动其实就是为了给 Master 发消息,准备注册 Application。

2.3 注册Application

  override def receive: PartialFunction[Any, Unit] = {

    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }

  }
  1. 如果是 STANDBY Master,不回响应;
  2. 调用 createApplication(appDescription, driverRpcEndpointRef) 方法创建 ApplicationInfo;
  3. 调用 registerApplication 注册 app,即将上面创建的 ApplicationInfo 加入到 Master.waitingApps 中;
  4. 利用 driverRpcEndpointRef 发送 RegisteredApplication 消息,即发送消息 RegisteredApplication 给 Driver,告诉 Driver application已经注册完成;
  5. 调用 schedule() 方法。

2.4 启动Application

  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }

Spark源码:提交Application到Spark集群 中,注册完 Driver 后也是调用该方法启动 Driver 的。

那里由于还没有往 Master.waitingApps 中加入app,因此调用 startExecutorsOnWorkers 方法啥也不干,但是这里,已经有 app 加入到 Master.waitingApps 中了,因此调用 startExecutorsOnWorkers 方法会为 app 启动 Executors 了。

说明几点:

  1. 这里注册 Application 时创建的 ApplicationInfo 加入到 Master.waitingApps 中,在 Spark源码:提交Application到Spark集群 中,注册 Driver 时创建的 DriverInfo 加入到了 Master.waitingDrivers 中;

  2. schedule() 方法中做了两件事:
    1)遍历 Master.waitingDrivers 启动各 Driver;
    2)遍历 Master.waitingApps 为各 App 启动 Executors。

调用 startExecutorsOnWorkers 方法为 app 启动 Executors 的具体过程,后面文章分析。

3 总结

  1. 调用 TaskSchedulerImpl.start 方法启动 TaskScheduler 时会调用 SchedulerBackend.start 方法启动 SchedulerBackend;
  2. SchedulerBackend 是 TaskScheduler 的后台线程,用于接收处理一些发给 TaskScheduler 的消息;
  3. StandaloneSchedulerBackend 启动时调用其父类 CoarseGrainedSchedulerBackend 的 start 方法用于启动 CoarseGrainedSchedulerBackend
  4. 启动 CoarseGrainedSchedulerBackend 时会创建 DriverEndpoint 并注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext);
  5. DriverEndpoint.onStart 方法被调用,该方法中启一个新线程定时给自己发 ReviveOffers 消息,自己处理 ReviveOffers 消息时调用 makeOffers 方法,这一过程其实就是定时调度提交 Tasks 的过程
  6. 创建 StandaloneAppClient 并调用其 start 方法,给所有 Master 发送消息 RegisterApplication,准备注册 Application;
  7. Master 收到 RegisterApplication 消息后创建 ApplicationInfo 并放到 Master.waitingApps 中,表示 Application 已注册完,回响应给 Driver;
  8. 调用 schedule 方法启动 Application,schedule 方法内做两件事:
    1)遍历 Master.waitingDrivers 启动各 Driver
    2)遍历 Master.waitingApps 为各 App 启动 Executors
  9. 为 App 启动 Executors 过程后面文章分析。
上一篇下一篇

猜你喜欢

热点阅读