Spark Core源码精读计划#16:通过ExecutorAl
目录
前言
按照SparkContext初始化的顺序,接下来就轮到调度系统的三大金刚——SchedulerBackend、TaskScheduler、DAGScheduler——出场了。与它们相关的细节非常多,绝不是一两篇文章能够讲清楚的,所以我们之后讲到Spark作业执行时,再自然地回过头详细看它们。本篇来讲解SparkContext初始化的倒数第二个组件:Executor分配管理器,即ExecutorAllocationManager。前面已经讲过,ExecutorAllocationManager可以通过与集群管理器联系,根据当前的负载动态增加或删除Executor,是一个比较智能的机制。
初始化
SparkContext中初始化ExecutorAllocationManager的细节在代码#2.13中有,因此这里只讲解其初始化的具体流程:
- 判断是否要启用Executor动态分配。如果配置项spark.dynamicAllocation.enabled为true,并且满足以下两条件之一:配置项spark.dynamicAllocation.testing为true,或者当前不是本地模式,就启用Executor动态分配。(为节省篇幅,后面会用
s.d
指代spark.dynamicAllocation前缀) - 判断SchedulerBackend的实现类是否继承了ExecutorAllocationClient特征,目前只有CoarseGrainedSchedulerBackend是如此。如果是,就用SchedulerBackend、ListenerBus、SparkConf和BlockManagerMaster的实例构造出ExecutorAllocationManager。
- 调用ExecutorAllocationManager.start()方法启动之。
ExecutorAllocationManager类的成员属性
ExecutorAllocationManager类的成员属性甚多,以下清单基本示出了全部的。
代码#16.1 - o.a.s.ExecutorAllocationManager类的成员属性
private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "60s")
private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s")
private val tasksPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
private var numExecutorsToAdd = 1
private var numExecutorsTarget = initialNumExecutors
private val executorsPendingToRemove = new mutable.HashSet[String]
private val executorIds = new mutable.HashSet[String]
private var addTime: Long = NOT_SET
private val removeTimes = new mutable.HashMap[String, Long]
val listener = new ExecutorAllocationListener
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
private var localityAwareTasks = 0
private var hostToLocalTaskCount: Map[String, Int] = Map.empty
下面我们分类来描述这些成员属性的含义。
Executor相关配置
- minNumExecutors/maxNumExecutors:分别对应配置项s.d.minExecutors/maxExecutors,代表动态分配过程中最小和最大的Executor数量,默认值为0和无穷大。
- initialNumExecutors:Executor的初始数量,用Utils.getDynamicAllocationInitialExecutors()方法来确定,其值是s.d.minExecutors、s.d.initialExecutors、spark.executor.instances三个参数的较大值。
- tasksPerExecutor:每个Executor执行的Task数的近似值,由spark.executor.cores与spark.task.cpus两个参数共同决定。
时长配置
- schedulerBacklogTimeoutS:由配置项s.d.schedulerBacklogTimeout指定,表示当有Task等待超过该时长时,就开始动态分配资源,默认1s。
- sustainedSchedulerBacklogTimeoutS:由配置项s.d.sustainedSchedulerBacklogTimeout指定,表示动态分配资源仍未达标时,每次再分配的时间间隔,默认与schedulerBacklogTimeoutS相同。
- executorIdleTimeoutS:由配置项s.d.executorIdleTimeout指定,表示Executor处于空闲状态(没有执行Task)的超时,超时后会移除Executor,默认值为60s。
- cachedExecutorIdleTimeoutS:由配置项s.d.cachedExecutorIdleTimeout指定,表示持有缓存块的Executor的空闲超时。由于缓存不能随意被清理,因此其默认值为无穷大。
计数器、缓存与其他
- numExecutorsToAdd:下次动态分配要添加的Executor数量。
- numExecutorsTarget:在当前时刻的Executor目标数量。这个计数主要是为了在Executor突然大量丢失的异常情况下,能够快速申请到需要的数目。
- executorsPendingToRemove:即将被移除但还没被杀掉的Executor ID缓存。
- executorIds:所有目前已知的Executor ID缓存。
- addTime:本次触发Executor添加的时间戳。
- removeTimes:Executor将要被删除时的ID与时间戳的映射。
- listener:ExecutorAllocationListener类型的监听器,用于监听与Executor相关的事件,包括Stage和Task提交与完成,Executor添加与删除等等。
- executor:单线程的调度线程池,用来执行周期性检查并动态分配Executor的任务。
- localityAwareTasks:所有当前活跃的Stage中,具有本地性偏好(就是数据尽量位于本地节点)的Task数量,
- hostToLocalTaskCount:每个物理节点上运行的Task数目的近似值。
ExecutorAllocationManager具体实现
启动
在初始化过程中,已经调用了ExecutorAllocationManager.start()方法,下面来看该方法的代码。
代码#16.2 - o.a.s.ExecutorAllocationManager.start()方法
def start(): Unit = {
listenerBus.addToManagementQueue(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
可见,ExecutorAllocationManager启动时,会先将ExecutorAllocationListener注册到LiveListenerBus中。然后会创建执行schedule()方法的任务,并用调度线程池executor以默认100ms的间隔定期执行。最后,调用ExecutorAllocationClient(其实就是CoarseGrainedSchedulerBackend)的requestTotalExecutors()方法,请求分配Executor,该方法在今后讲解SchedulerBackend时会提到。
下面先来看看schedule()方法。
调度动态调整逻辑
代码#16.3 - o.a.s.ExecutorAllocationManager.schedule()方法
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
executorIdsToBeRemoved += executorId
}
!expired
}
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}
可见,schedule()方法做了两件事:调用updateAndSyncNumExecutorsTarget()方法重新计算并同步当前所需的Executor数量,调用removeExecutors()方法删掉那些已经判定为过期的Executor。
重新计算Executor数量
以下就是updateAndSyncNumExecutorsTarget()方法以及其调用的maxNumExecutorsNeeded()方法的源码。
代码#16.4 - o.a.s.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget()方法
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded
if (initializing) {
0
} else if (maxNeeded < numExecutorsTarget) {
val oldNumExecutorsTarget = numExecutorsTarget
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
numExecutorsToAdd = 1
if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
}
numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
delta
} else {
0
}
}
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
该方法的流程如下:
- 调用maxNumExecutorsNeeded()计算出当前所需的最大Executor数量maxNeeded。其计算方法是:从监听器取得等待中的Task计数与运行中的Task计数,将两者相加并减1,最后除以每个Executor上运行Task数的估计值。
- 如果ExecutorAllocationManager仍然在初始化,就直接返回0。注意该方法的返回值是Executor数量的变化量,而不是总数。
- 检查maxNeeded与上述numExecutorsTarget值的大小关系。如果目标Executor数量超过了最大需求数,就将numExecutorsTarget设置为maxNeeded与minNumExecutors的较大值,然后调用ExecutorAllocationClient.requestTotalExecutors()方法。此时会通知集群管理器取消未执行的Executor,并且不再添加新的Executor,返回减少的Executor数量。
- 如果目标Executor数量小于最大需求数,并且当前的时间戳比上一次添加Executor的时间戳要新,就调用addExecutors()方法。此时会通知集群管理器新添加Executor,更新addTime记录的时间戳,返回增加的Executor数量。
下面我们就分别来看看减少与增加Executor的逻辑。先来看减少Executor的方法。
减少Executor
下面是Executor空闲时的回调方法,该方法由监听器调用。受限于篇幅,本文不展开讲ExecutorAllocationListener的细节,并且不是很难,看官可以自行参考。
代码#16.5 - o.a.s.ExecutorAllocationManager.onExecutorIdle()方法
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
val now = clock.getTimeMillis()
val timeout = {
if (hasCachedBlocks) {
now + cachedExecutorIdleTimeoutS * 1000
} else {
now + executorIdleTimeoutS * 1000
}
}
val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
removeTimes(executorId) = realTimeout
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
}
}
该方法首先确定removeTimes和executorsPendingToRemove缓存中都不存在当前的Executor ID,然后判断该Executor是否缓存了块。如果有缓存块,就将其超时时间设为无限大,否则就按正常的空闲超时来处理。最后将这个Executor的ID与其计划被删除的时间戳存入removeTimes映射。接下来就是代码#16.3中调用的removeExecutors()方法。
代码#16.6 - o.a.s.ExecutorAllocationManager.removeExecutors()方法
private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
val executorIdsToBeRemoved = new ArrayBuffer[String]
logInfo("Request to remove executorIds: " + executors.mkString(", "))
val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size
var newExecutorTotal = numExistingExecutors
executors.foreach { executorIdToBeRemoved =>
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)")
} else if (newExecutorTotal - 1 < numExecutorsTarget) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
} else if (canBeKilled(executorIdToBeRemoved)) {
executorIdsToBeRemoved += executorIdToBeRemoved
newExecutorTotal -= 1
}
}
if (executorIdsToBeRemoved.isEmpty) {
return Seq.empty[String]
}
val executorsRemoved = if (testing) {
executorIdsToBeRemoved
} else {
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
newExecutorTotal = numExistingExecutors
if (testing || executorsRemoved.nonEmpty) {
executorsRemoved.foreach { removedExecutorId =>
newExecutorTotal -= 1
logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
executorsPendingToRemove.add(removedExecutorId)
}
executorsRemoved
} else {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!")
Seq.empty[String]
}
}
该方法的执行流程如下:
- 计算剩余的Executor数目。
- 遍历要删除的Executor ID列表,判断删除之后剩余的Executor数是否小于最小允许的Executor数量与目标Executor数量,如果是的话,该Executor就不能删除。反之,如果根据canBeKilled()方法判断出executorIds缓存中存在该Executor,并且尚未进入executorsPendingToRemove,就将其标记为可删除。
- 调用ExecutorAllocationClient.killExecutor()方法,真正地杀掉Executor。再调用requestTotalExecutors()方法,重新申请新的Executor数目。
- 如果要删除的Executor列表中有最终未被杀掉的,就将它们再次加入executorsPendingToRemove缓存中,等待删除。
最后,监听器会调用Executor减少后的回调方法onExecutorRemoved(),该方法主要是清理各个缓存,逻辑很简单,不再赘述。
增加Executor
最后来看代码#16.4中调用的addExecutors()方法。
代码#16.7 - o.a.s.ExecutorAllocationManager.addExecutors()方法
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
if (numExecutorsTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because our current target total " +
s"is already $numExecutorsTarget (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
val oldNumExecutorsTarget = numExecutorsTarget
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
numExecutorsTarget += numExecutorsToAdd
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
if (delta == 0) {
if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
numExecutorsTarget =
math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
} else {
numExecutorsToAdd = 1
return 0
}
}
val addRequestAcknowledged = try {
testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
} catch {
case NonFatal(e) =>
logInfo("Error reaching cluster manager.", e)
false
}
if (addRequestAcknowledged) {
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
1
}
delta
} else {
logWarning(
s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
numExecutorsTarget = oldNumExecutorsTarget
0
}
}
Executor目标值numExecutorsTarget的计算逻辑用语言描述很麻烦,因此不再多讲,看官读一遍代码就能明白个大概。唯一特别需要注意的是numExecutorsToAdd * 2
这句话,它说明增加Executor时,每次申请的新Executor数目是指数级别增长的。为什么要采用这种策略?根据经验,多数App在启动时只需要少量的Executor就可以满足计算需求,但一旦资源紧张时,用指数增长可以使申请到满足需求的资源的次数降低。
总结
在提笔写这篇文章之前,我曾先入为主地认为ExecutorAllocationManager的逻辑没有如此复杂,现在看来是我过于天真了啊。
实在很累了,去休息一下,凌晨3点还要起床看欧冠决赛。
YNWA!