任务执行-源码分析
2021-09-21 本文已影响0人
专职掏大粪
CoarseGrainedExecutorBackend.receive
收到LaunchTask消息
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
//反序列化task对象
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
//计算对象运行task
executor.launchTask(this, taskDesc)
}
使用executor的线程池threadPool执行task
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription, plugins)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}
TaskRunner.run
//任务运行
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
resources = taskDescription.resources,
plugins = plugins)
Task.run
runTask(context)
计算对象运行,计算逻辑在每个任务中