Spark源码分析之Excutor资源分配流程
一.前言
在用户提交应用程序时,SparkContext会向Master发送注册消息,并由Master给该应用分配Executor。
这里的SparkContext主要用于负责和ClusterManager通信,进行资源的管理,任务分配和监控,负责作业执行的生命周期管理,ClusterManager提供了资源的分配和管理。
在不同模式下ClusterManager的角色不同,Standalone中由Master担任,在Yarn模式下由ResourceManager担任。SparkContext对运行的作业划分并分配资源后,会把任务发送到Executor去运行。
本文主要着重对Excutor资源分配的过程进行梳理。
二.Excutor资源分配原理分析
运行一个作业都会一定会有SparkContext,这里我们要明确,在SparkContext启动过程中,会首先创建DAGScheduler和TaskSechduler两个调度器,还有schedulerBackend用于分配当前可用的资源。
1.DAGScheduler
DAGScheduler主要负责将用户的应用的DAG划分为不同的Stage,其中每个Stage由可以并发执行的一组Task构成, 这些Task的执行逻辑完全相同,只是作用于不同的数据。
2.TaskSechduler
负责具体任务的调度执行,从DAGScheduler接收不同Stage的任务,按照调度算法,分配给应用程序的资源Executor上执行相关任务,并为执行特别慢的任务启动备份任务。 TaskSchedulerImpl创建时创建SchedulableBuilder,SchedulableBuilder根据类型分为FIFOSchedulableBuilder和FairSchedulableBuilder两类,。调度器的区别我会在以后的文章中说明,这里先记住有这个概念就行。
3.SchedulerBackend
分配当前可用的资源, 具体就是向当前等待分配计算资源的Task分配计算资源(即Executor) , 并且在分配的Executor上启动Task, 完成计算的调度过程。 它使用reviveOffers完成上述的任务调度。
SparkContext新建时,内部创建一个SparkEnv,SparkEnv内部创建一个RpcEnv
- SparkEnv:用户执行的环境信息,包括通信相关的端点。
- RpcEnv:SparkContext中远程通信环境
Excutor资源分配首先会通过TaskSchedulerImpl的start方法,调用StandaloneSchedulerBackend的start方法,在向RPCEnv注册DriverEndpoint和ClientEndpoint端点。
//TaskSchedulerImpl的start方法
override def start() {
//StandaloneSchedulerBackend 启动
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
//StandaloneSchedulerBackend的start方法
override def start() {
/**
* super.start()中有创建Driver的通信邮箱也就是Driver的引用
* 未来Executor就是向 StandaloneSchedulerBackend中父类 CoarseGrainedSchedulerBackend 中反向注册信息的.
*/
super.start()
// SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
// mode. In cluster mode, the code that submits the application to the Master needs to connect
// to the launcher instead.
if (sc.deployMode == "client") {
launcherBackend.connect()
}
// The endpoint for executors to talk to us
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val webUrl = sc.ui.map(_.webUrl).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
// If we're using dynamic allocation, set our initial executor limit to 0 for now.
// ExecutorAllocationManager will send the real initial limit to the Master later.
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
val appDesc: ApplicationDescription = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
//提交应用程序的描述信息
//封装 appDesc,这里已经传入了StandaloneAppClient 中
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//启动StandaloneAppClient,之后会向Driver注册application的信息
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
上面源码首先会调用父类的start方法,准备DriverEndpoint的引用信息,这里的DriverEndpoint是CoarseGrainedSchedulerBackend内部的一个对象,主要用于和其他端点交互,传输信息。
未来Executor就是向 StandaloneSchedulerBackend中父类 CoarseGrainedSchedulerBackend 中反向注册信息的.
之后通过StandaloneAppClient实例调用start方法,StandaloneAppClient的start方法如下。
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
/**
* 这里就是给空的 endpoint[AtomicReference] 设置下 信息,
* 主要是rpcEnv.setupEndpoint 中创建了 ClientEndpoint 只要设置Endpoint 肯定会调用 ClientEndpoint的onStart方法
*/
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
这里会调用ClientEndpoint端点的onStart方法,onStart中会调用registerWithMaster向Master注册应用程序的基本信息。
//onStart 方法
override def onStart(): Unit = {
try {
//向Master 注册当前application的信息
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
之后看一下Master中的receive方法如何处理RegisterApplication的消息的。
override def receive: PartialFunction[Any, Unit] = {
....
//Driver 端提交过来的要注册Application
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
//如果Master状态是standby 忽略不提交任务
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//这里封装application信息,注意,在这里可以跟进去看到默认一个application使用的core的个数就是 Int.MaxValue
val app = createApplication(description, driver)
//注册app ,这里面有向 waitingApps中加入当前application
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
//最终又会执行通用方法schedule()
schedule()
}
....
}
可以看到Matser收到注册消息后把该应用加入到等待运行应用列表,调用执行通用方法schedule(),计算需要分配的资源信息,这里指的的是Executor资源。schedule方法中会执行startExecutorsOnWorkers方法。
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
//从waitingApps中获取提交的app
for (app <- waitingApps) {
//coresPerExecutor 在application中获取启动一个Executor使用几个core 。参数--executor-core可以指定,下面指明不指定就是1
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
//判断是否给application分配够了core,因为后面每次给application 分配core后 app.coresLeft 都会相应的减去分配的core数
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executor
//过滤出可用的worker
val usableWorkers : Array[WorkerInfo]= workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
/**
* 下面就是去worker中划分每个worker提供多少core和启动多少Executor,注意:spreadOutApps 是true
* 返回的 assignedCores 就是每个worker节点中应该给当前的application分配多少core
*/
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
//在worker中给Executor划分资源
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
上述方法首先会从waitingApps中获取提交的app,然后会对每个worker提供多少core和启动多少Executor做一个计算。主要调用了scheduleExecutorsOnWorkers方法。
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
//启动一个Executor使用多少core,这里如果提交任务没有指定 --executor-core这个值就是None
val coresPerExecutor : Option[Int]= app.desc.coresPerExecutor
//这里指定如果提交任务没有指定启动一个Executor使用几个core,默认就是1
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
//oneExecutorPerWorker 当前为true
val oneExecutorPerWorker :Boolean= coresPerExecutor.isEmpty
//默认启动一个Executor使用的内存就是1024M,这个设置在SparkContext中464行
//若提价命令中有 --executor-memory 5*1024 就是指定的参数
val memoryPerExecutor = app.desc.memoryPerExecutorMB
//可用worker的个数
val numUsable = usableWorkers.length
//创建两个重要对象
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
/**
* coresToAssign 指的是当前要给Application分配的core是多少? app.coresLeft 与集群所有worker剩余的全部core 取个最小值
* 这里如果提交application时指定了 --total-executor-core 那么app.coresLeft 就是指定的值
*/
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
def canLaunchExecutor(pos: Int): Boolean = {..}
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
//最后返回每个Worker上分配多少core
assignedCores
}
scheduleExecutorsOnWorkers的主要过程其实就是worker提供多少core,每个Executor需要多少core(默认为1),一个worker启动多少Executor,同时判断每个Executor使用多少内存,默认启动一个executor使用的内存就是1024M。
在默认情况下,Standalone模式中每个应用程序所能分配到的CPU核数可以由spark.deploy.defaultCores进行设置,但是该配置项默认情况下为Int.Max,也就是不限制,当然用户可以设置total-executor-core 配置项,约束每个application最多使用多少core,还有可以设置--executor-cores设置一个executor上最少几个core。
另外在分配应用程序资源时,会根据Worker的分配策略进行,分配算法有两种:
1.应用程序运行在尽可能多的Worker上,这种分配算法不仅能充分使用集群资源,而且还有利于数据处理的本地性。
2.应用程序运行在尽可能少的Worker上,该情况适合CPU密集型而内存使用较少的场景。
该策略可以通过spark.deploy.spreadOut参数进行配置,默认情况下为true。也就是说默认就是在Woker上均分,每一个worker都会分配executor。
如果设置了false,是在一个worker中尽可能多分配excutor,然后此worker不满足分配条件之后再向其他worker中分配executor,会根据--total-executor-core 和 --executor-core来判断需要多少个executor和每个executor多少core,但是注意这个core是最小值,也就是说如果oneExecutorPerWorker为true,一个worker一个excutor则,如果worker数不够,也就会导致executor数不足预期的executor数,那就会导致为了满足total-executor-core的要求,会在executor上多分配core,也就是比你配置的executor-core值要大。
之后会将这些Executor的启动信息封装发送到woker上去。
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
//每个Executor要分配多少个core
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec: ExecutorDesc = app.addExecutor(worker, coresToAssign)
//去worker中启动Executor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
woker中的receive方法,会匹配LaunchExecutor
//启动Executor
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
//创建ExecutorRunner
val manager = new ExecutorRunner(
appId,
execId,
/**
* appDesc 中有 Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",.......) 中
* 第一个参数就是Executor类
*/
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
/**
* 启动ExecutorRunner
* 启动的就是 CoarseGrainedExecutorBackend 类,
* 下面看 CoarseGrainedExecutorBackend 类中的main 方法有反向注册给Driver
*/
manager.start()
coresUsed += cores_
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {...}
....
可以看到上面源码中,此时会在woker中创建ExecutorRunner,ExecutorRunner构建时传入的参数appDesc中有 command,而这个command就是CommandCoarseGrainedExecutorBackend的全限定名。
之后的manager.start(),会在woker中开辟线程异步的启动CoarseGrainedExecutorBackend,然后调用CoarseGrainedExecutorBackend的main方法。
private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
....
}
CommandCoarseGrainedExecutorBackend的main方法中会调用run方法,初始化sparkEnv,rpcEnv等相关基础设施。
之后是创建执行容器executor的步骤,具体是从RPCEnv中拿到DriverEndpoint(CoarseGrainedSecheduleBackend类中)的引用,给DriverEndpoint类反向注册Executor并等待返回。
源码过程如下
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
....
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
//注册Executor的通信邮箱,会调用CoarseGrainedExecutorBackend的onstart方法
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
....
}
CoarseGrainedExecutorBackend的onStart方法
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
//从RPC中拿到Driver的引用,给Driver反向注册Executor
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
//拿到Driver的引用
driver = Some(ref)
/**
* 给Driver反向注册Executor信息,这里就是注册给之前看到的 CoarseGrainedSchedulerBackend 类中的DriverEndpoint
* DriverEndpoint类中会有receiveAndReply 方法来匹配RegisterExecutor
*/
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
DriverEndpoint处理RegisterExecutor请求,对excutor信息封装加入到相关容器中去,发送消息给ExecutorRef 告诉Executor已经被注册。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
....
//反向注册的Executor
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
/**
* 拿到Execuotr的通信邮箱,发送消息给ExecutorRef 告诉 Executor已经被注册。
* 在 CoarseGrainedExecutorBackend 类中 receive方法一直监听有没有被注册,匹配上就会启动Executor
*
*/
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
....
}
此时上面源码中的ExecutorRef为CoarseGrainedExecutorBackend的引用,然后会在receive函数中匹配上Driver端发过来的消息,表示已经接受注册Executor了,下面要启动Executor,Executor内部维护一个线程池,Executor中有线程池用于task运行。
override def receive: PartialFunction[Any, Unit] = {
//匹配上Driver端发过来的消息,已经接受注册Executor了,下面要启动Executor
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//下面创建Executor,Executor真正的创建Executor,Executor中有线程池用于task运行【Executor中89行】
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
}
到此Excutor资源分配就完成了。之后该Excutor会定时向Driver发送心跳信息,等待Driver下发任务。
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
之后Excutor会接受DriverEndPoint端点发送的LaunchTask的执行任务的消息,任务执行是在Executor的launchTask方法实现的。在执行是会创建TaskRunner进程,该进程主要负责任务的执行,处理完毕后发送状态变更statusUpdate消息返回给DriverEndPoint。DriverEndPoint接受到消息后会调用TaskScheduleImpl的statusUpdate方法,根据任务执行不同的结果进行处理,处理完毕后再给该Executor分配执行任务。
其中再DriverEndPoint端点处理状态变更的代码如下。
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
三.总结
用一张图总结上述的调度流程
Executor调度流程至此Executor资源分配的整个流程的源码就分析完毕了。
下一篇文章会对具体任务的调度流程进行分析,敬请关注。