原创-Spark源码分析二:Standalone模式下Maste
2018-11-01 本文已影响0人
无色的叶
接着上篇分析《https://www.jianshu.com/p/c9aa62460e43》
在Master选举为leader后发送ElectedLeader消息,匹配recive方法中的ElectedLeader消息处理
case ElectedLeader =>
//获取持久化的app、driver、worker信息
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
//如都为空,则master状态为ALIVE,否则为RECOVERING
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
//恢复master状态
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
如master状态为RECOVERING则进一步调用beginRecovery()方法恢复master状态
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
//重新注册APP
registerApplication(app)
app.state = ApplicationState.UNKNOWN
//通知driver master节点改变了
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver
}
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
//重新注册work
registerWorker(worker)
worker.state = WorkerState.UNKNOWN
//通知work更新master
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
在master节点主备节点切换时,会触发该方法,且在RECOVERING状态的master节点不能处理接受任何新提交的任务,再回到ElectedLeader 消息的处理在执行完beginRecovery方法后,紧接着会向自身发送CompleteRecovery消息
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
接着查看CompleteRecovery消息处理,调用completeRecovery()方法
case CompleteRecovery => completeRecovery()
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
//确保只recovery一次
if (state != RecoveryState.RECOVERING) {
return
}
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
//移除未知状态的work和app
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Update the state of recovered apps to RUNNING
apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE
//开始调度执行未执行的任务
schedule()
logInfo("Recovery complete - resuming operations!")
}
后续接着分析,笔者水平有限,如有误欢迎指正