[spark] 从spark-submit开始解析整个任务调度流
本文在spark2.1以Standalone Cluster模式下解析
概述
spark应用程序可以以Client模式和Cluster启动,区别在于Client模式下的Driver是在执行spark-submit命令节点上启动的,而Cluster模式下是Master随机选择的一台Worker通过DriverWrapper来启动Driver的。
大概流程为:
- 通过spark-submit提交会调用SparkSubmit类,SparkSubmit类里通过反射调用Client,Client与Master通信来SubmitDriver,收到成功回复后退出JVM(SparkSubmit进程退出)。
- Master收到SubmitDriver后会随机选择一台能满足driver资源需求的Worker,然后与对应Worker通信发送启动driver的消息。Worker收到消息后根据driver的信息等来拼接成linux命令来启动DriverWrapper,在该类里面再启动driver,最后将Driver执行状态返回给Master。
- driver启动后接下来就是注册APP,在SparkContext启动过程中会通过创建AppClient并与Master通信要求注册application。
- Master收到消息后会去调度执行这个application,通过调度算法获取该application需要在哪些Worker上启动executor,接着与对应的Worker通信发送启动Executor的消息。
- Worker 收到消息后通过拼接linux命令,启动了CoarseGrainedExecutorBackend进程,接着向Driver通信进行Executor的注册,成功注册后会在CoarseGrainedExecutorBackend中创建Executor对象。
- 接着就是job的执行了,可以参看前面的文章……
Submit Driver
通过shell命令spark-submit提交一个自己编写的application,最终实际是通过java -cp调用的类是:
org.apache.spark.deploy.SparkSubmit
在该类的main方法中,在Cluster模式下不使用Rest,会通过反射调用Client类:
org.apache.spark.deploy.Client
在Client类的main方法中会获得与Master通信的EndpointRef,并且创建一个名为Client的ClientEndpoint,在生命周期的onStart中会创建一个Driver的描述信息对象DriverDescription,其中包括了最终需要启动Driver的mainClass:
org.apache.spark.deploy.worker.DriverWrapper
接着向Master发送一个RequestSubmitDriver消息,Master收到后将DriverInfo持久化到存储系统,然后通过schedule()去调度,接着会向Client返回一个SubmitDriverResponse消息,Client收到成功提交成功消息后会再次向Master发送RequestDriverStatus消息询问driver的状态,若能收到Master端存在该driver的回复消息DriverStatusResponse则退出JVM(SparkSubmit进程退出)。
流程如图:
Master LaunchDriver
前面提到Master收到提交Driver的消息后会调用schedule()方法:
private def schedule(): Unit = {
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
该方法会先打乱Worker防止Driver集中在一台Worker上,当Worker的资源满足driver所需要的资源,则会调用launchDriver方法。
在launchDriver方法里会向对应的Worker发送一个LaunchDriver消息,该Worker接收到消息后通过driver的各种描述信息创建一个DriverRunner,然后调用其start方法。
start方法中将driver的参数组织成Linux命令,通过java -cp来运行上面提到的DriverWrapper类来启动Driver,而不是直接启动,这是为了Driver程序和启动Driver的Worker程序共命运(源码注释中称为share fate),即如果此Worker挂了,对应的Driver也会停止。
最后将Driver的执行状态返回给Master。
流程如图:
Register APP
Driver起来后当然会涉及到APP向Master的注册,在创建SparkContext的时候,会创建SchedulerBackend和TaskScheduler:
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
接着调用了TaskScheduler(TaskSchedulerImpl)的start方法,start方法里面又调用了SchedulerBackend(standalone模式下是StandaloneSchedulerBackend)的start方法:
override def start() {
super.start()
...
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
...
}
super.start()中创建了driverEndpoint。先根据application的参数创建了ApplicationDescription,又创建了StandaloneAppClient并调用其start方法,在start方法中创建了名为AppClient的Endpoint,在其生命周期的onStart方法中向Master发送了RegisterApplication消息进行注册app。
Master收到RegisterApplication消息后,创建描述application的ApplicationInfo,并持久化到存储系统,随后向AppClient返回RegisteredApplication的消息,然后通过schedule()去调度application。
流程如图:
Launch Executor
在上文Master LaunchDriver时解析了该方法的前部分,前部分说明了是如何将Driver调度到Worker上启动的。
private def schedule(): Unit = {
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
现在来说说后部分 startExecutorsOnWorkers()是怎么在Worker上启动Executor的:
private def startExecutorsOnWorkers(): Unit = {
// 遍历所有等待调度的application,顺序为FIFO
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// 过滤出资源能满足APP对于每一个Executor需求的Worker
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
// 对Executor的调度(为每个Worker分配的core数)
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// 根据前面调度好的,在对应Worker上启动Executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
先过滤出能满足application对于一个Executor资源要求的Worker,然后对Executor进行调度,策略有两种:
- 使用spreadOutApps算法分配资源,即Executor分布在尽可能多的Worker节点上
- Executor聚集在某些Worker节点上
启用spreadOutApps算法通过参数spark.deploy.spreadOut配置,默认为true,scheduleExecutorsOnWorkers方法返回的就是每个Worker能分配到的core数。
然后通过allocateWorkerResourceToExecutors去计算该Worker上需要启动的Executor:
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// 计算在该Worker上启动的Executor数,总cores / 一个Executor所需
// 若没有指定一个Executor所需core数,则将分到的core数都给一个Executor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
通过计算得到该Worker需要启动的Executor数,然后调用launchExecutor方法通过与对应的Worker通信来发送LaunchExecutor消息。
流程如图:
对应的Worker收到消息后将收到的信息封装成ExecutorRunner对象,并调用其start方法:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
...
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
...
在manager的start方法中调用了fetchAndRunExecutor方法:
private def fetchAndRunExecutor() {
try {
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
...
process = builder.start()
...
}
这里和启动Driver启动的方式类似,通过收到的信息拼接成Linux命令,通过Java -cp 来启动CoarseGrainedExecutorBackend进程。
流程如图:
在CoarseGrainedExecutorBackend的main方法里创建了名为Executor的Endpoint,在其生命周期的onStart()方法里向Driver发送了RegisterExecutor消息。
Driver收到消息后根据Executor信息创建了ExecutorData对象,并加入到executorDataMap集合中,然后返回RegisteredExecutor消息给CoarseGrainedExecutorBackend。
CoarseGrainedExecutorBackend收到RegisteredExecutor后:
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
便创建了一个Executor对象,此对象将执行Driver分配的Task。
流程如图:
接着就是通过DAGScheduler、TaskScheduler等对Stage的划分,Task的调度等执行,最终将Task结果返回到Driver,具体可看前面的文章: