Spark源码之TaskScheduler
Spark源码之TaskScheduler介绍篇
前面DAGScheduler将stage划分好之后,又将生成的TaskSet提交给TaskScheduler,那么本章节就要叙述下TaskScheduler如何启动Task的;
TaskScheduler任务源码分析
DAGScheduler将TaskSet提交给TaskScheduler,那么就先看下submitTasks()
,打开TaskScheduler的实现类TaskSchedulerImpl,在这个方法里面,先生成了一个TaskManager对象来封装taskSet,然后判断当前stage中是否只正常运行一个taskSet,以及taskManager是否是僵尸进程;随后将生成的TaskManager放入到schedulableBuilder调度策略中,做完以上工作后开始想backend申请资源backend.reviveOffers()
;
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//创建一个TaskManager maxTaskFailures:最大失败重试次数 默认4次
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
// 在这里判断当前stage的是否有两个taskSet在运行,因为同一个stage中只能运行一个taskSet
// 一方面判断当前的TaskSet是否已经在运行了,
// 另一方面判断当前的taskSetManager是否是僵尸进程
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
//将TaskSetManager添加到调度策略中(FIFOSchedulableBuilder/FIARSchedulableBuilder)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//在这里开始向backend的发送资源申请的请求,其实是向DriverEndPoint发送的请求
backend.reviveOffers()
}
我们进入CoarseGrainedSchedulerBackend的reviveOffers方法,可以看到在方法里面driverEndpoint向自己发送了一个ReviveOffers消息,而这个driverEndpoint我们前面也讲过,就是当前应用程序的Driver;
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
进入driverEndpoint
的reviveOffers
方法,最终调用的是makeOffers()
方法,在这个方法里面先过滤出状态为alive的executor,然后将这些activeExecutor封装成WorkerOffer对象,关键点是在最后的lanchTasks
方法,我们先看下scheduler.resourceOffers(workOffers)
这个方法的作用;
case ReviveOffers =>
makeOffers()
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
我们再回到TaskSchedulerImpl,查看resourceOffers方法:
TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入时ExecutorBackend以及可用的Cores,输出是 TaskDescription的二维数组,在其中确定了每个task具体运行在哪个ExecutorBackend;
在方法内部先将可用的executors添加到数据结构中,然后在将可用的executors进行shuffle以便做到负载均衡,为每个executor创建一个task数组用于存放TaskDescription,最后遍历调度策略中的TaskSet,使用就近原则为task分配executor,在这里需要腔调一点的是DAGScheduler.submitMissingTasks()
方法中我们是获取了每个task的对应数据的位置,而在本方法中的taskSet.myLocalityLevels)
是为了获取Task对应数据位置的级别,如下代码所示:
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
//todo 标记是否有新的executor
var newExecAvail = false
//todo 遍历每个executor
for (o <- offers) {
//todo 向数据结构中添加executor信息
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
//todo 将可用的executor进行shuffle打乱,以便做到负载均衡
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
//todo 为每个executor构建一个task数组
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
//todo 所有executor可用的core资源
val availableCpus = shuffledOffers.map(o => o.cores).toArray
//todo 从调度池中获取TaskSetManagers
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
//todo 我们在DAGScheduler.submitMissingTasks()中已经获取了每个Task中数据所在的位置,
//todo 这是的taskSet.myLocalityLevels只是根据Task数据所在的host来获取它的的数据本地
//todo 性级别(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)
var launchedTask = false
//todo 对每一个taskSet,按照就近顺序分配最近的executor来执行task
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
//todo 将前面随机打散的WorkOffers计算资源按照就近原则分配给taskSet,用于执行其中的task
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
接着进入resourceOfferSingleTaskSet
方法,遍历所有的executor的索引地址,以便作为tasks的索引,将每个task分配给相应的executor,并填充tasks;
而这个tasks数据结构是调用resourceOfferSingleTaskSet的方法里传进来的,它存储着每个executor内的tasks信息,详细信息见下图源码:
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
//todo 遍历所有的executor的索引
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
//todo 判断该executor可以用的资源是否>=CPUS_PER_TASK(默认为1)
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
//todo 为taskSet中的task分配executor,并将信息存储在tasks中,注意这个tasks是从
//todo 上面的方法传进来的
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
//todo 将每个task信息写入下面的数据结构中
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
再到DriverEndPoint的makeOffers方法中,scheduler.resourceOffers(workOffers)已经执行完毕,taskSet已经分配完毕,接着执行launchTasks()方法,该方法遍历每个task,并向每个task所对应的executor发送launchTask消息;如下代码所示:
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
//todo 遍历每个task
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
//todo 检查序列化后的task大小
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//todo 获取task对应的executor
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//todo 向该executor发送launchTask请求
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
我们继续追踪,进入CoarseGrainedExecutorBackend的LaunchTask的LaunchTask方法,该方法又调用了executor.launchTask
;
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
我们进入Executor的launchTask方法,在该方法内实例化除了TaskRunner对象,而TaskRunner对象是一个线程,通过线程池threadPool来运行;
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
总结:
DAGScheduler划分好stage,并将生成的TaskSet提交给TaskScheduler,TaskScheduler向Driver请求分配资源,Driver将可用的ExecutorBackend资源发给TaskScheduler,在TaskScheduler中将Task分配给ExecutorBackend,最后向ExecutorBackend发送launchTask请求,在ExecutorBackend中调executor对象的launchTask,在Executor对象的launchTask方法中启动TaskRunner线程并用线程池去执行TaskRunner线程;