spark

任务调度-源码分析

2021-09-21  本文已影响0人  专职掏大粪
//包装成一个任务级进行提交
taskScheduler.submitTasks(new TaskSet(
       tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
       stage.resourceProfileId))

TaskSchedulerImpl.submitTasks

//任务集管理器
private[scheduler] def createTaskSetManager(
      taskSet: TaskSet,
      maxTaskFailures: Int): TaskSetManager = {
    new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
  }
//调度buid加入管理器
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

调度器初始化

 def initialize(backend: SchedulerBackend): Unit = {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

FIFOSchedulableBuilder.addTaskSetManager

  override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
   //任务池
    rootPool.addSchedulable(manager)
  }

backend.reviveOffers()
CoarseGrainedSchedulerBackend.reviveOffers

 override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
     //给自己发送ReviveOffers消息
    driverEndpoint.send(ReviveOffers)
  }

自己接收消息

 override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data, resources) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
          ... ...

      case ReviveOffers =>
      //接收到ReviveOffers消息
        makeOffers()

CoarseGrainedSchedulerBackend.makeOffers

private def makeOffers(): Unit = {
      // Make sure no executor is killed while some task is launching on it
     //得到任务的描述信息
      val taskDescs = withLock {
           ... ...
                (rName, rInfo.availableAddrs.toBuffer)
              }, executorData.resourceProfileId)
        }.toIndexedSeq
        //调度任务,从任务池里取任务 执行
        scheduler.resourceOffers(workOffers, true)
      }
      if (taskDescs.nonEmpty) {
        //任务运行
        launchTasks(taskDescs)
      }
    }

resourceOffers

    val sortedTaskSets = rootPool.getSortedTaskSetQueue
//判断本地化级别
 for (currentMaxLocality <- taskSet.myLocalityLevels) {
          var launchedTaskAtCurrentMaxLocality = false
          do {
            val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
              taskSet, currentMaxLocality, shuffledOffers, availableCpus,
              availableResources, tasks, addressesWithDescs)
            launchedTaskAtCurrentMaxLocality = minLocality.isDefined
            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
            noDelaySchedulingRejects &= noDelayScheduleReject
            globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
          } while (launchedTaskAtCurrentMaxLocality)
        }
 override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
//跟据调度算法进行manager排序   schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
    }
    sortedTaskSetQueue
  }


 private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
   //不同的调度模式 有不同算法
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
      case _ =>
        val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
        throw new IllegalArgumentException(msg)
    }
  }

最终拿到任务就开始执行了
launchTasks

 private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
    遍历每一个任务描述
     for (task <- tasks.flatten) {
       val serializedTask = TaskDescription.encode(task)
     //是否task序列化的size超出限制
       if (serializedTask.limit() >= maxRpcMessageSize) {
         Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
           try {
             var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
               s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
               s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
             msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
             taskSetMgr.abort(msg)
           } catch {
             case e: Exception => logError("Exception in error callback", e)
           }
         }
       }
       else {
         val executorData = executorDataMap(task.executorId)
         // Do resources allocation here. The allocated resources will get released after the task
         // finishes.
         val rpId = executorData.resourceProfileId
         val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
         val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
         executorData.freeCores -= taskCpus
         task.resources.foreach { case (rName, rInfo) =>
           assert(executorData.resourcesInfo.contains(rName))
           executorData.resourcesInfo(rName).acquire(rInfo.addresses)
         }

         logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
           s"${executorData.executorHost}.")
//找到对应executor的终端,发送LaunchTask消息
         executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
       }
     }
   }

上一篇 下一篇

猜你喜欢

热点阅读