原创-Spark源码分析六:Standalone模式下Drive

2019-08-05  本文已影响0人  无色的叶

作业提交流程图

image.png

作业执行流程描述:

driver启动流程

master接收到RequestSubmitDriver请求后会调用schedule()方法

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    // 对当前ALIVE状态worker进行随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    //  获取现在ALIVE的Worker数
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      //  遍历一个waitingDrivers集合的复制
      //  将worker分配给每个等待的driver
      //  对于每一个driver,从分配给driver的最后一个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        //  如果选出的worker空闲资源需求大于driver所需资源,则在该worker上启动driver
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 调用launchDriver方法
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    //  调用startExecutorsOnWorkers方法
    startExecutorsOnWorkers()
  }

launchDriver方法

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    driver.state = DriverState.RUNNING
  }

work接收到LaunchDriver消息后

case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem

查看Driver Runner的start方法,该方法中

 /** Starts a thread to run and manage the driver. */
  private[worker] def start() = {
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        var shutdownHook: AnyRef = null
        try {
          shutdownHook = ShutdownHookManager.addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId")
            kill()
          }

          // prepare driver jars and run driver
          val exitCode = prepareAndRunDriver()

          // set final state depending on if forcibly killed and process exit code
          finalState = if (exitCode == 0) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e: Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some(e)
        } finally {
          if (shutdownHook != null) {
            ShutdownHookManager.removeShutdownHook(shutdownHook)
          }
        }

        // notify worker of final driver state, possible exception
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }

查看prepareAndRunDriver方法

 private[worker] def prepareAndRunDriver(): Int = {
    val driverDir = createWorkingDirectory()
    val localJarFilename = downloadUserJar(driverDir)

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    // TODO: If we add ability to submit multiple jars they should also be added here
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }

runDriver方法中,重定向将process的InputStream和ErrorStream重定向到指定的stdout、stderr文件中
这就是我们在Spark application web页面中查看Logs对应的两个日志文件


image.png

最后调用runCommandWithRetry方法

private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {

    builder.directory(baseDir)
    //  初始化
    def initialize(process: Process): Unit = {
      // Redirect stdout and stderr to files
      //  创建stdout文件
      val stdout = new File(baseDir, "stdout")
      //  将process的InputStream流重定向到stdout文件
      CommandUtils.redirectStream(process.getInputStream, stdout)
      //  创建stderr文件
      val stderr = new File(baseDir, "stderr")
      //  将builder命令格式化处理
      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
      Files.append(header, stderr, StandardCharsets.UTF_8)
      //  将process的Error流信息重定向到stderr文件
      CommandUtils.redirectStream(process.getErrorStream, stderr)
    }
    //  调用runCommandWithRetry
    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  }

该方法中,有一个初始化方法initialize,在初始化方法中做了如下步骤:

在runCommandWithRetry方法中最后调用runCommandWithRetry方法

private[worker] def runCommandWithRetry(
      command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
    //  默认exitCode为-1
    var exitCode = -1
    // Time to wait between submission retries.
    // 在提交重试时等待时间
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5
    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        //  如果被kill,返回exitCode
        if (killed) { return exitCode }
        //  执行命令command,启动
        process = Some(command.start())
        //  调用初始化方法
        initialize(process.get)
      }

      val processStart = clock.getTimeMillis()

      //  获取exitCode
      exitCode = process.get.waitFor()

      // check if attempting another run

      //  检查是否尝试另一次运行
      keepTrying = supervise && exitCode != 0 && !killed
      if (keepTrying) {
        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
          waitSeconds = 1
        }
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }
    }
    //  返回exitCode
    exitCode
  }
}

process = Some(command.start()) 真正执行启动Driver进程, exitCode = process.get.waitFor()获取执行的返回码

移除Driver操作

DriverRunner根据退出码进行匹配,通知worker driver状态发生改变,worker接受到该消息后,处理逻辑如下:

 private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
    // driver id
    val driverId = driverStateChanged.driverId
    val exception = driverStateChanged.exception
    val state = driverStateChanged.state
    //  根据state进行匹配,并打印日志
    state match {
      case DriverState.ERROR =>
        logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
      case DriverState.FAILED =>
        logWarning(s"Driver $driverId exited with failure")
      case DriverState.FINISHED =>
        logInfo(s"Driver $driverId exited successfully")
      case DriverState.KILLED =>
        logInfo(s"Driver $driverId was killed by user")
      case _ =>
        logDebug(s"Driver $driverId changed state to $state")
    }
    //  向master发送消息driverStateChanged
    sendToMaster(driverStateChanged)
    //  在drivers集合中移除当前driver
    val driver = drivers.remove(driverId).get
    //  将当前driver添加到finishedDrivers
    finishedDrivers(driverId) = driver
    trimFinishedDriversIfNecessary()
    //  移除driver内存、cpu cores资源
    memoryUsed -= driver.driverDesc.mem
    coresUsed -= driver.driverDesc.cores
  }

Master在接受到消息后,根据匹配做出操作,如果为ERROR FINISHED KILLED FAILED状态,则调用removeDriver移除Driver

case DriverStateChanged(driverId, state, exception) =>
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }

removeDriver方法中,做出一系列移除操作

  private def removeDriver(
      driverId: String,
      finalState: DriverState,
      exception: Option[Exception]) {
    //  根据该driver id进行模式匹配
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //  在drivers集合中移除driver
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 将driver添加到completedDrivers集合中
        completedDrivers += driver
        //  持久化引擎移除driver
        persistenceEngine.removeDriver(driver)
        //  更新driver状态为finalState时
        driver.state = finalState
        driver.exception = exception
        //  移除worker上的driver
        driver.worker.foreach(w => w.removeDriver(driver))
        //  调用schedule方法,做移除操作
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

接下来分析:启动executor

参考:http://www.louisvv.com/archives/1366.html

上一篇 下一篇

猜你喜欢

热点阅读