Spark

Spark源码分析之Excutor资源分配流程

2019-06-16  本文已影响6人  叫我不矜持

一.前言

在用户提交应用程序时,SparkContext会向Master发送注册消息,并由Master给该应用分配Executor。

这里的SparkContext主要用于负责和ClusterManager通信,进行资源的管理,任务分配和监控,负责作业执行的生命周期管理,ClusterManager提供了资源的分配和管理。

在不同模式下ClusterManager的角色不同,Standalone中由Master担任,在Yarn模式下由ResourceManager担任。SparkContext对运行的作业划分并分配资源后,会把任务发送到Executor去运行。

本文主要着重对Excutor资源分配的过程进行梳理。

二.Excutor资源分配原理分析

运行一个作业都会一定会有SparkContext,这里我们要明确,在SparkContext启动过程中,会首先创建DAGScheduler和TaskSechduler两个调度器,还有schedulerBackend用于分配当前可用的资源。

1.DAGScheduler
DAGScheduler主要负责将用户的应用的DAG划分为不同的Stage,其中每个Stage由可以并发执行的一组Task构成, 这些Task的执行逻辑完全相同,只是作用于不同的数据。

2.TaskSechduler
负责具体任务的调度执行,从DAGScheduler接收不同Stage的任务,按照调度算法,分配给应用程序的资源Executor上执行相关任务,并为执行特别慢的任务启动备份任务。 TaskSchedulerImpl创建时创建SchedulableBuilder,SchedulableBuilder根据类型分为FIFOSchedulableBuilder和FairSchedulableBuilder两类,。调度器的区别我会在以后的文章中说明,这里先记住有这个概念就行。

3.SchedulerBackend
分配当前可用的资源, 具体就是向当前等待分配计算资源的Task分配计算资源(即Executor) , 并且在分配的Executor上启动Task, 完成计算的调度过程。 它使用reviveOffers完成上述的任务调度。

SparkContext新建时,内部创建一个SparkEnv,SparkEnv内部创建一个RpcEnv


Excutor资源分配首先会通过TaskSchedulerImpl的start方法,调用StandaloneSchedulerBackend的start方法,在向RPCEnv注册DriverEndpoint和ClientEndpoint端点。

//TaskSchedulerImpl的start方法
override def start() {
    //StandaloneSchedulerBackend 启动
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

//StandaloneSchedulerBackend的start方法
override def start() {
    /**
      * super.start()中有创建Driver的通信邮箱也就是Driver的引用
      * 未来Executor就是向 StandaloneSchedulerBackend中父类 CoarseGrainedSchedulerBackend 中反向注册信息的.
      */
    super.start()

    // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
    // mode. In cluster mode, the code that submits the application to the Master needs to connect
    // to the launcher instead.
    if (sc.deployMode == "client") {
      launcherBackend.connect()
    }

    // The endpoint for executors to talk to us
    val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val webUrl = sc.ui.map(_.webUrl).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    // If we're using dynamic allocation, set our initial executor limit to 0 for now.
    // ExecutorAllocationManager will send the real initial limit to the Master later.
    val initialExecutorLimit =
      if (Utils.isDynamicAllocationEnabled(conf)) {
        Some(0)
      } else {
        None
      }
    val appDesc: ApplicationDescription = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    //提交应用程序的描述信息
    //封装 appDesc,这里已经传入了StandaloneAppClient 中
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    //启动StandaloneAppClient,之后会向Driver注册application的信息
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

上面源码首先会调用父类的start方法,准备DriverEndpoint的引用信息,这里的DriverEndpoint是CoarseGrainedSchedulerBackend内部的一个对象,主要用于和其他端点交互,传输信息。

未来Executor就是向 StandaloneSchedulerBackend中父类 CoarseGrainedSchedulerBackend 中反向注册信息的.

之后通过StandaloneAppClient实例调用start方法,StandaloneAppClient的start方法如下。

 def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    /**
      *  这里就是给空的 endpoint[AtomicReference] 设置下 信息,
      *  主要是rpcEnv.setupEndpoint 中创建了 ClientEndpoint 只要设置Endpoint 肯定会调用 ClientEndpoint的onStart方法
      */
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

这里会调用ClientEndpoint端点的onStart方法,onStart中会调用registerWithMaster向Master注册应用程序的基本信息。

//onStart 方法
    override def onStart(): Unit = {
      try {
        //向Master 注册当前application的信息
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

之后看一下Master中的receive方法如何处理RegisterApplication的消息的。

override def receive: PartialFunction[Any, Unit] = {
    ....
    //Driver 端提交过来的要注册Application
    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      //如果Master状态是standby 忽略不提交任务
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        //这里封装application信息,注意,在这里可以跟进去看到默认一个application使用的core的个数就是 Int.MaxValue
        val app = createApplication(description, driver)
        //注册app ,这里面有向 waitingApps中加入当前application
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        //最终又会执行通用方法schedule()
        schedule()
      }
    ....
}

可以看到Matser收到注册消息后把该应用加入到等待运行应用列表,调用执行通用方法schedule(),计算需要分配的资源信息,这里指的的是Executor资源。schedule方法中会执行startExecutorsOnWorkers方法。

private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    //从waitingApps中获取提交的app
    for (app <- waitingApps) {
      //coresPerExecutor 在application中获取启动一个Executor使用几个core 。参数--executor-core可以指定,下面指明不指定就是1
      val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
      // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
      //判断是否给application分配够了core,因为后面每次给application 分配core后 app.coresLeft 都会相应的减去分配的core数
      if (app.coresLeft >= coresPerExecutor) {
        // Filter out workers that don't have enough resources to launch an executor
        //过滤出可用的worker
        val usableWorkers : Array[WorkerInfo]= workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
            worker.coresFree >= coresPerExecutor)
          .sortBy(_.coresFree).reverse

        /**
          * 下面就是去worker中划分每个worker提供多少core和启动多少Executor,注意:spreadOutApps 是true
          * 返回的 assignedCores 就是每个worker节点中应该给当前的application分配多少core
          */
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

        // Now that we've decided how many cores to allocate on each worker, let's allocate them
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
          //在worker中给Executor划分资源
          allocateWorkerResourceToExecutors(
            app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
        }
      }
    }
  }

上述方法首先会从waitingApps中获取提交的app,然后会对每个worker提供多少core和启动多少Executor做一个计算。主要调用了scheduleExecutorsOnWorkers方法。

 private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    //启动一个Executor使用多少core,这里如果提交任务没有指定 --executor-core这个值就是None
    val coresPerExecutor : Option[Int]= app.desc.coresPerExecutor
    //这里指定如果提交任务没有指定启动一个Executor使用几个core,默认就是1
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    //oneExecutorPerWorker 当前为true
    val oneExecutorPerWorker :Boolean= coresPerExecutor.isEmpty
    //默认启动一个Executor使用的内存就是1024M,这个设置在SparkContext中464行
    //若提价命令中有 --executor-memory 5*1024 就是指定的参数
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    //可用worker的个数
    val numUsable = usableWorkers.length
    //创建两个重要对象
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    /**
      * coresToAssign 指的是当前要给Application分配的core是多少? app.coresLeft 与集群所有worker剩余的全部core 取个最小值
      * 这里如果提交application时指定了 --total-executor-core 那么app.coresLeft  就是指定的值
      */
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)


     def canLaunchExecutor(pos: Int): Boolean = {..}

   var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor(pos)) {
          coresToAssign -= minCoresPerExecutor
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          if (spreadOutApps) {
            keepScheduling = false
          }
        }
      }
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    //最后返回每个Worker上分配多少core
    assignedCores
    }

scheduleExecutorsOnWorkers的主要过程其实就是worker提供多少core,每个Executor需要多少core(默认为1),一个worker启动多少Executor,同时判断每个Executor使用多少内存,默认启动一个executor使用的内存就是1024M。

在默认情况下,Standalone模式中每个应用程序所能分配到的CPU核数可以由spark.deploy.defaultCores进行设置,但是该配置项默认情况下为Int.Max,也就是不限制,当然用户可以设置total-executor-core 配置项,约束每个application最多使用多少core,还有可以设置--executor-cores设置一个executor上最少几个core。

另外在分配应用程序资源时,会根据Worker的分配策略进行,分配算法有两种:
1.应用程序运行在尽可能多的Worker上,这种分配算法不仅能充分使用集群资源,而且还有利于数据处理的本地性。
2.应用程序运行在尽可能少的Worker上,该情况适合CPU密集型而内存使用较少的场景。

该策略可以通过spark.deploy.spreadOut参数进行配置,默认情况下为true。也就是说默认就是在Woker上均分,每一个worker都会分配executor。

如果设置了false,是在一个worker中尽可能多分配excutor,然后此worker不满足分配条件之后再向其他worker中分配executor,会根据--total-executor-core 和 --executor-core来判断需要多少个executor和每个executor多少core,但是注意这个core是最小值,也就是说如果oneExecutorPerWorker为true,一个worker一个excutor则,如果worker数不够,也就会导致executor数不足预期的executor数,那就会导致为了满足total-executor-core的要求,会在executor上多分配core,也就是比你配置的executor-core值要大。

之后会将这些Executor的启动信息封装发送到woker上去。

private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    //每个Executor要分配多少个core
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      val exec: ExecutorDesc = app.addExecutor(worker, coresToAssign)
      //去worker中启动Executor
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

woker中的receive方法,会匹配LaunchExecutor

     //启动Executor
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>

         //创建ExecutorRunner
          val manager = new ExecutorRunner(
            appId,
            execId,
            /**
              * appDesc 中有 Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",.......) 中
              * 第一个参数就是Executor类
              */
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager

          /**
            * 启动ExecutorRunner
            * 启动的就是 CoarseGrainedExecutorBackend 类,
            * 下面看 CoarseGrainedExecutorBackend 类中的main 方法有反向注册给Driver
            */
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
     } catch {...}
  ....

可以看到上面源码中,此时会在woker中创建ExecutorRunner,ExecutorRunner构建时传入的参数appDesc中有 command,而这个command就是CommandCoarseGrainedExecutorBackend的全限定名。

之后的manager.start(),会在woker中开辟线程异步的启动CoarseGrainedExecutorBackend,然后调用CoarseGrainedExecutorBackend的main方法。

 private[worker] def start() {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()
....
  }

CommandCoarseGrainedExecutorBackend的main方法中会调用run方法,初始化sparkEnv,rpcEnv等相关基础设施。

之后是创建执行容器executor的步骤,具体是从RPCEnv中拿到DriverEndpoint(CoarseGrainedSecheduleBackend类中)的引用,给DriverEndpoint类反向注册Executor并等待返回。

源码过程如下

 private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL]) {

    ....
     val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
      //注册Executor的通信邮箱,会调用CoarseGrainedExecutorBackend的onstart方法
      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
   ....
}

CoarseGrainedExecutorBackend的onStart方法

override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    //从RPC中拿到Driver的引用,给Driver反向注册Executor
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      //拿到Driver的引用
      driver = Some(ref)
      /**
        * 给Driver反向注册Executor信息,这里就是注册给之前看到的 CoarseGrainedSchedulerBackend 类中的DriverEndpoint
        * DriverEndpoint类中会有receiveAndReply 方法来匹配RegisterExecutor
        */
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

DriverEndpoint处理RegisterExecutor请求,对excutor信息封装加入到相关容器中去,发送消息给ExecutorRef 告诉Executor已经被注册。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
....
     //反向注册的Executor
     case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
           /**
            * 拿到Execuotr的通信邮箱,发送消息给ExecutorRef 告诉 Executor已经被注册。
            * 在 CoarseGrainedExecutorBackend 类中 receive方法一直监听有没有被注册,匹配上就会启动Executor
            *
            */
          executorRef.send(RegisteredExecutor)
          // Note: some tests expect the reply to come after we put the executor in the map
          context.reply(true)
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          makeOffers()
....

}

此时上面源码中的ExecutorRef为CoarseGrainedExecutorBackend的引用,然后会在receive函数中匹配上Driver端发过来的消息,表示已经接受注册Executor了,下面要启动Executor,Executor内部维护一个线程池,Executor中有线程池用于task运行。

override def receive: PartialFunction[Any, Unit] = {

   //匹配上Driver端发过来的消息,已经接受注册Executor了,下面要启动Executor
  case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        //下面创建Executor,Executor真正的创建Executor,Executor中有线程池用于task运行【Executor中89行】
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }
}

到此Excutor资源分配就完成了。之后该Excutor会定时向Driver发送心跳信息,等待Driver下发任务。

private def startDriverHeartbeater(): Unit = {
    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

    // Wait a random interval so the heartbeats don't end up in sync
    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

    val heartbeatTask = new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    }
    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
  }

之后Excutor会接受DriverEndPoint端点发送的LaunchTask的执行任务的消息,任务执行是在Executor的launchTask方法实现的。在执行是会创建TaskRunner进程,该进程主要负责任务的执行,处理完毕后发送状态变更statusUpdate消息返回给DriverEndPoint。DriverEndPoint接受到消息后会调用TaskScheduleImpl的statusUpdate方法,根据任务执行不同的结果进行处理,处理完毕后再给该Executor分配执行任务。

其中再DriverEndPoint端点处理状态变更的代码如下。

override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

三.总结

用一张图总结上述的调度流程

Executor调度流程

至此Executor资源分配的整个流程的源码就分析完毕了。

下一篇文章会对具体任务的调度流程进行分析,敬请关注。

上一篇 下一篇

猜你喜欢

热点阅读