Spark源码解读

Spark中Client源码分析(一)

2016-03-23  本文已影响168人  lehi

在Spark Standalone中我们所谓的Client,它的任务其实是由AppClient和DriverClient共同完成的。AppClient是一个允许app(Client)和Spark集群通信的中间人,接受master URL、app的信息、一个集群事件的监听器以及事件监听的回调函数,主要和Master交互App相关的信息,DriverClient主要用于和Master交互Driver相关的信息,比如启动、停止及运行状况等,本篇先介绍AppClient。

1.AppClient类主要字段、方法如下:

AppClient类成员
由上图我们可以知道,ClientEndpoint是作为AppClient的一个私有类存在的。
(1)stop方法如下所示,主要用于向master发送消息,停止并注销app。
<code>
def stop() {
if (endpoint != null) {
try {
//返回Rpc ask的超时时间120s
val timeout = RpcUtils.askRpcTimeout(conf)
//client向master发送注销app的信息,在120s内如果不响应,那么将抛RpcTimeoutException
timeout.awaitResult(endpoint.askBoolean)
} catch {
case e: TimeoutException =>
logInfo("Stop request to Master timed out; it may already be shut down.")
}
endpoint = null
}
}</code>
下面我们重点看ClientEndpoint,它是线程安全的。

2.ClientEndpoint

2.1属性

(1)//设置一个boolean标识,用于避免多次调用listener.disconnected()
private var alreadyDisconnected = false
(2)//app向master申请注册的线程池,因为被maser注册是一个阻塞操作,所以线程池的个数是"masterRpcAddresses.size",这样app就能同时被所有的master注册
private val registerMasterThreadPool = new ThreadPoolExecutor(
0,
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueueRunnable,
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
(3)一个守护单线程用于申请注册操作
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")

2.2方法

(1)构造函数为ClientEndpoint的主构造器。
(2)onStart方法,用于将App注册到所有的Master上
<code>
override def onStart(): Unit = {
try {
//“1”表示第几次注册,最大次数不超过3次,第n次申请注册到master上
registerWithMaster(1)详见下①
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
//监听器停止并将boolen状态标识设置为true
markDisconnected()
//停止rpcendpoint
stop()
}
}
</code>
①registerWithMaster方法如下,用于异步注册到所有的master上,如果没有超过再次注册的次数(3次),那么每20s将会重新调用该方法申请注册,如果注册成功,所有的调用work和futures将会被取消。
<code>
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures = tryRegisterAllMasters()
registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
if (registered) {
registerMasterFutures.foreach(.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.foreach(
.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
</code>
(3)onstop方法如下,释放资源。
<code>
override def onStop(): Unit = {
if (registrationRetryTimer != null) {
registrationRetryTimer.cancel(true)
}
registrationRetryThread.shutdownNow()
registerMasterFutures.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
}
</code>
(4)receive方法,receive接受到的消息分为5种,分别为

上一篇 下一篇

猜你喜欢

热点阅读