Spark源码:启动TaskScheduler
初始化 SparkContext 时,会创建TaskScheduler,现在来看看TaskScheduler 启动过程。
1 启动TaskScheduler
调用_taskScheduler.start()
启动TaskScheduler。
- 进入
org.apache.spark.scheduler.TaskSchedulerImpl.scala
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
override def start() {
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
-
backend.start()
启动SchedulerBackend; - 如果是 非Local模式 且
spark.speculation = true
,即开启了推测机制,则定时启新线程执行checkSpeculatableTasks,检查可推测的Tasks。
2 启动SchedulerBackend
- 进入
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.scala
private[spark] class StandaloneSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with StandaloneAppClientListener
with Logging {
override def start() {
super.start()
// SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
// mode. In cluster mode, the code that submits the application to the Master needs to connect
// to the launcher instead.
if (sc.deployMode == "client") {
launcherBackend.connect()
}
// The endpoint for executors to talk to us
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val webUrl = sc.ui.map(_.webUrl).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
// If we're using dynamic allocation, set our initial executor limit to 0 for now.
// ExecutorAllocationManager will send the real initial limit to the Master later.
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
}
StandaloneSchedulerBackend 继承了 CoarseGrainedSchedulerBackend。
- super.start() 调用父类 CoarseGrainedSchedulerBackend 的 start 方法;
- 配置各种参数:driverUrl、args、extraJavaOpts、classPathEntries、libraryPathEntries、javaOpts等,构建
Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ......)
; - 生成 ApplicationDescription,将 Command 加入到 ApplicationDescription 中,后面会使用到;
- 创建 StandaloneAppClient 并启动;
- 更新 app 状态为 SUBMITTED;
- 等待 app 注册并启动;
- 更新 app 状态为 RUNNING。
注:这边的 2、3两步和提交 Application 时启动 Driver 的过程很相似:
-
在启动 Driver 时,配置各种参数构建
Command("org.apache.spark.deploy.worker.DriverWrapper",......)
,然后创建 DriverDescription; -
此处,配置各种参数构建
Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ......)
,然后创建 ApplicationDescription。
具体见Spark源码:提交Application到Spark集群。
2.1 启动CoarseGrainedSchedulerBackend
- 进入
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.scala
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
}
protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
创建 DriverEndpoint,以 “CoarseGrainedScheduler” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。
注:每次注册 RpcEndpoint 到 RpcEnv 上时,都会加入OnStart
到 Inbox 的队列中,因此必然要执行 RpcEndpoint.onStart() 方法。
来看看 DriverEndpoint.onStart 方法。
- 进入
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
}
该方法中会启一个新线程定时给自己发送 ReviveOffers 消息。
- 进入
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint.scala
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
makeOffers()
}
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}
此方法会遍历 CoarseGrainedSchedulerBackend.executorDataMap,而此时 executorDataMap 中还没有任何东西,因此该方法等于啥也没干,等后面分析。
2.2 创建 StandaloneAppClient 并启动
- 进入
org.apache.spark.deploy.client.StandaloneAppClient.scala
private[spark] class StandaloneAppClient(
rpcEnv: RpcEnv,
masterUrls: Array[String],
appDescription: ApplicationDescription,
listener: StandaloneAppClientListener,
conf: SparkConf)
extends Logging {
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
// 省略部分内容
}
创建ClientEndpoint,并以 “AppClient” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。
- 进入
org.apache.spark.deploy.client.StandaloneAppClient.ClientEndpoint.scala
override def onStart(): Unit = {
try {
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
- 遍历所有masterRpcAddresses;
- 根据 masterAddress 和 masterEndpointName 获取masterRpcEndpointRef;
- 利用 masterRpcEndpointRef 发送 RegisterApplication(ApplicationDescription, DriverRpcEndpointRef) 消息。
创建 StandaloneAppClient 并启动其实就是为了给 Master 发消息,准备注册 Application。
2.3 注册Application
- 进入
org.apache.spark.deploy.master.Master.scala
override def receive: PartialFunction[Any, Unit] = {
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
}
- 如果是 STANDBY Master,不回响应;
- 调用 createApplication(appDescription, driverRpcEndpointRef) 方法创建 ApplicationInfo;
- 调用 registerApplication 注册 app,即将上面创建的 ApplicationInfo 加入到 Master.waitingApps 中;
- 利用 driverRpcEndpointRef 发送 RegisteredApplication 消息,即发送消息 RegisteredApplication 给 Driver,告诉 Driver application已经注册完成;
- 调用 schedule() 方法。
2.4 启动Application
- 进入
org.apache.spark.deploy.master.Master.scala
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
在 Spark源码:提交Application到Spark集群 中,注册完 Driver 后也是调用该方法启动 Driver 的。
那里由于还没有往 Master.waitingApps 中加入app,因此调用 startExecutorsOnWorkers 方法啥也不干,但是这里,已经有 app 加入到 Master.waitingApps 中了,因此调用 startExecutorsOnWorkers 方法会为 app 启动 Executors 了。
说明几点:
-
这里注册 Application 时创建的 ApplicationInfo 加入到 Master.waitingApps 中,在 Spark源码:提交Application到Spark集群 中,注册 Driver 时创建的 DriverInfo 加入到了 Master.waitingDrivers 中;
-
schedule() 方法中做了两件事:
1)遍历 Master.waitingDrivers 启动各 Driver;
2)遍历 Master.waitingApps 为各 App 启动 Executors。
调用 startExecutorsOnWorkers 方法为 app 启动 Executors 的具体过程,后面文章分析。
3 总结
- 调用 TaskSchedulerImpl.start 方法启动 TaskScheduler 时会调用 SchedulerBackend.start 方法启动 SchedulerBackend;
- SchedulerBackend 是 TaskScheduler 的后台线程,用于接收处理一些发给 TaskScheduler 的消息;
- StandaloneSchedulerBackend 启动时调用其父类 CoarseGrainedSchedulerBackend 的 start 方法用于启动 CoarseGrainedSchedulerBackend;
- 启动 CoarseGrainedSchedulerBackend 时会创建 DriverEndpoint 并注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext);
- DriverEndpoint.onStart 方法被调用,该方法中启一个新线程定时给自己发 ReviveOffers 消息,自己处理 ReviveOffers 消息时调用 makeOffers 方法,这一过程其实就是定时调度提交 Tasks 的过程;
- 创建 StandaloneAppClient 并调用其 start 方法,给所有 Master 发送消息 RegisterApplication,准备注册 Application;
- Master 收到 RegisterApplication 消息后创建 ApplicationInfo 并放到 Master.waitingApps 中,表示 Application 已注册完,回响应给 Driver;
- 调用 schedule 方法启动 Application,schedule 方法内做两件事:
1)遍历 Master.waitingDrivers 启动各 Driver
2)遍历 Master.waitingApps 为各 App 启动 Executors - 为 App 启动 Executors 过程后面文章分析。