spark

任务执行-源码分析

2021-09-21  本文已影响0人  专职掏大粪

CoarseGrainedExecutorBackend.receive
收到LaunchTask消息

override def receive: PartialFunction[Any, Unit] = {
   case RegisteredExecutor =>
     logInfo("Successfully registered with driver")
     try {
       executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
         resources = _resources)
       driver.get.send(LaunchedExecutor(executorId))
     } catch {
       case NonFatal(e) =>
         exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
     }

   case LaunchTask(data) =>
     if (executor == null) {
       exitExecutor(1, "Received LaunchTask command but executor was null")
     } else {
       //反序列化task对象
       val taskDesc = TaskDescription.decode(data.value)
       logInfo("Got assigned task " + taskDesc.taskId)
       taskResources(taskDesc.taskId) = taskDesc.resources
      //计算对象运行task
       executor.launchTask(this, taskDesc)
     }

使用executor的线程池threadPool执行task

 def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription, plugins)
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr)
    if (decommissioned) {
      log.error(s"Launching a task while in decommissioned state.")
    }
  }

TaskRunner.run

//任务运行
   val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem,
            resources = taskDescription.resources,
            plugins = plugins)

Task.run

runTask(context)

计算对象运行,计算逻辑在每个任务中

上一篇下一篇

猜你喜欢

热点阅读