Spark源码之TaskScheduler

2018-10-18  本文已影响0人  小狼星I

Spark源码之TaskScheduler介绍篇

前面DAGScheduler将stage划分好之后,又将生成的TaskSet提交给TaskScheduler,那么本章节就要叙述下TaskScheduler如何启动Task的;

TaskScheduler任务源码分析

DAGScheduler将TaskSet提交给TaskScheduler,那么就先看下submitTasks(),打开TaskScheduler的实现类TaskSchedulerImpl,在这个方法里面,先生成了一个TaskManager对象来封装taskSet,然后判断当前stage中是否只正常运行一个taskSet,以及taskManager是否是僵尸进程;随后将生成的TaskManager放入到schedulableBuilder调度策略中,做完以上工作后开始想backend申请资源backend.reviveOffers();

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //创建一个TaskManager maxTaskFailures:最大失败重试次数 默认4次
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      // 在这里判断当前stage的是否有两个taskSet在运行,因为同一个stage中只能运行一个taskSet
      // 一方面判断当前的TaskSet是否已经在运行了,
      // 另一方面判断当前的taskSetManager是否是僵尸进程
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      //将TaskSetManager添加到调度策略中(FIFOSchedulableBuilder/FIARSchedulableBuilder)
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //在这里开始向backend的发送资源申请的请求,其实是向DriverEndPoint发送的请求
    backend.reviveOffers()
  }

我们进入CoarseGrainedSchedulerBackend的reviveOffers方法,可以看到在方法里面driverEndpoint向自己发送了一个ReviveOffers消息,而这个driverEndpoint我们前面也讲过,就是当前应用程序的Driver;

  override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

进入driverEndpointreviveOffers方法,最终调用的是makeOffers()方法,在这个方法里面先过滤出状态为alive的executor,然后将这些activeExecutor封装成WorkerOffer对象,关键点是在最后的lanchTasks方法,我们先看下scheduler.resourceOffers(workOffers)这个方法的作用;

   case ReviveOffers =>
     makeOffers()


    // Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

我们再回到TaskSchedulerImpl,查看resourceOffers方法:
TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入时ExecutorBackend以及可用的Cores,输出是 TaskDescription的二维数组,在其中确定了每个task具体运行在哪个ExecutorBackend;
在方法内部先将可用的executors添加到数据结构中,然后在将可用的executors进行shuffle以便做到负载均衡,为每个executor创建一个task数组用于存放TaskDescription,最后遍历调度策略中的TaskSet,使用就近原则为task分配executor,在这里需要腔调一点的是DAGScheduler.submitMissingTasks()方法中我们是获取了每个task的对应数据的位置,而在本方法中的taskSet.myLocalityLevels)是为了获取Task对应数据位置的级别,如下代码所示:

  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    //todo 标记是否有新的executor
    var newExecAvail = false
    //todo 遍历每个executor
    for (o <- offers) {
      //todo 向数据结构中添加executor信息
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    //todo 将可用的executor进行shuffle打乱,以便做到负载均衡
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    //todo 为每个executor构建一个task数组
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    //todo 所有executor可用的core资源
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //todo 从调度池中获取TaskSetManagers
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    //todo 我们在DAGScheduler.submitMissingTasks()中已经获取了每个Task中数据所在的位置,
    //todo 这是的taskSet.myLocalityLevels只是根据Task数据所在的host来获取它的的数据本地
    //todo 性级别(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)
    var launchedTask = false
    //todo  对每一个taskSet,按照就近顺序分配最近的executor来执行task
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        //todo 将前面随机打散的WorkOffers计算资源按照就近原则分配给taskSet,用于执行其中的task
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

接着进入resourceOfferSingleTaskSet方法,遍历所有的executor的索引地址,以便作为tasks的索引,将每个task分配给相应的executor,并填充tasks;
而这个tasks数据结构是调用resourceOfferSingleTaskSet的方法里传进来的,它存储着每个executor内的tasks信息,详细信息见下图源码:

  private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //todo 遍历所有的executor的索引
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      //todo 判断该executor可以用的资源是否>=CPUS_PER_TASK(默认为1)
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          //todo 为taskSet中的task分配executor,并将信息存储在tasks中,注意这个tasks是从
          //todo 上面的方法传进来的
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            //todo 将每个task信息写入下面的数据结构中
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToTaskCount(execId) += 1
            executorsByHost(host) += execId
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }

再到DriverEndPoint的makeOffers方法中,scheduler.resourceOffers(workOffers)已经执行完毕,taskSet已经分配完毕,接着执行launchTasks()方法,该方法遍历每个task,并向每个task所对应的executor发送launchTask消息;如下代码所示:

    // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      //todo 遍历每个task
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        //todo 检查序列化后的task大小
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          //todo 获取task对应的executor
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //todo 向该executor发送launchTask请求
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

我们继续追踪,进入CoarseGrainedExecutorBackend的LaunchTask的LaunchTask方法,该方法又调用了executor.launchTask

    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

我们进入Executor的launchTask方法,在该方法内实例化除了TaskRunner对象,而TaskRunner对象是一个线程,通过线程池threadPool来运行;

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

总结:

DAGScheduler划分好stage,并将生成的TaskSet提交给TaskScheduler,TaskScheduler向Driver请求分配资源,Driver将可用的ExecutorBackend资源发给TaskScheduler,在TaskScheduler中将Task分配给ExecutorBackend,最后向ExecutorBackend发送launchTask请求,在ExecutorBackend中调executor对象的launchTask,在Executor对象的launchTask方法中启动TaskRunner线程并用线程池去执行TaskRunner线程;

上一篇下一篇

猜你喜欢

热点阅读