【Spark】Master原理剖析-3个机制
从主备切换机制原理、注册机制原理、资源调度机制三个方面进行分析。
1.主备切换机制原理
前话:Master实际上可以配置2个,Spark原生的standalone模式是支持Master主备切换的,也就是说, 当Active Master 节点挂掉的时候,我们可以将Standby Master切换为Active Master.
Spark Master主备切换可以基于两种机制,一种是基于文件系统,一种是基于zookeeper,基于文件系统的主备切换机制,需要在Active Master挂掉之后,由我们手动去切换到Standy Master上,而基于zookeeper的主备切换机制,可以实现自动切换Master。
具体切换如图所示:
主备切换机制原理1)切换到Standby Master状态后,使用持久化引擎去读取持久化的storedApps、storedDrivers、storedWorkers。
2)读取出来以后,如果storedApps、storedDrivers、storedWorkers。有任何一个是非空的,就会将持久化的Application、Driver、Worker的信息重新进行注册、注册到Master内部的内存缓存结构中。
3)注册完成后,将Application、Worker的状态都修改为UNKNOW,然后向Application所对应的Driver,以及Worker发送Standby Master的地址。
4)Driver和Worker在接收到Master发送来的地址之后,返回响应消息给新的Master。并使用completeRecovery()方法对没有发送响应消息的Driver和Worker进行处理,过滤掉它们的消息。
5)最后,调用Master自己的schedule()方法,对正在等待资源调度的Driver和Application进行调度,比如在某个worker节点上启动Driver,或者为Application在Worker上启动它需要的Executor。
总结:1.从内存缓存结构中移除2.从相关的组件的内存缓存中移除3.从持久化存储中移除
2.注册机制原理
对于Worker而言:
1)Worker在启动之后,就会向Master注册,Master做的第一件事是过滤,将状态为DEAD的Worker过滤掉,对于状态为UNKNOW的Worker,清理掉旧的Worker信息,替换为新的Worker信息。
2)过滤完成后,把Worker加入内存缓存中(HashMap),然后用持久换引擎persistenceEngine将Worker信息进行持久化(文件系统、zookeeper)
3)调用Schedule方法。
对于Driver而言:
1)用spark-submit提交spark Application的时候,首先就会注册Driver,将Driver信息放入内存缓存中(HashMap)。
2)加入等待调度队列(ArrayBuffer),用持久化引擎persistenceEngine将Driver信息持久化。
3)调用Schedule方法。
对于Application而言:
Driver启动好了,执行我们编写的Application代码,执行SparkContext初始化,底层的SparkDeploySchedulerBackend通过AppClient内部的线程ClientActor,发送RegisterApplication到Master,进行Application的注册。
1)Master将Application信息放入内存缓存(HashMap),加入等待调度的Application队列(ArrayBuffer)中。
2)用持久化引擎persistenceEngine将Driver信息持久化,调用Schedule方法。
3.资源调度机制
Diver的调度机制:
直接看schedule方法,首先判断master状态不是alive的话,直接返回。接着取出workers中所有注册的worker,进行过滤,必须是状态为alive的worker。对它调用Random的shuffle方法,进行随机的打乱。再调度Driver,这里说一下Driver的调度机制,首先遍历waitingDrivers ArrayBuffer,直到所有的worker全部遍历到。里面的launchDriver方法很重要。在launchDriver方法中,首先将driver加入worker内存的缓存结构,并将worker内使用的内存和cpu数量都加上driver需要的内存和cpu数量,同时把worker也加入到driver内部的缓存结构中,然后调用worker的actor,给它发送launchDriver消息,让worker来启动driver,并把driver的状态设置为RUNNING。
Application的调度机制:
首先application的调度方法有两种,一种是spreadOutApps,另一种是非spreadOutApps。
1)对于第一种而言,首先遍历waitingApps中的ApplicationInfo,并且过滤出还有需要调度的core的Application。其次,从workers中,过滤出状态为Alive的,再次过滤出可以被application使用的worker,然后按照剩余内存CPU数量倒序排序,通过数组来存储每个worker的CPU数量,并给每个worker分配cpu core,分配完成后,遍历worker,只要此worker上分配了core以后,就在worker上启动executor,并将application状态设置为RUNNING。
2)对于非spreadOutApps方法,主要区别是将每一个Application,尽可能少的分配到worker上面去。
举个例子来说:比如总共有10个worker,每个有10个core,app总共有20个core,那么其实只会分配到两个worker上,每个worker都占满10个core,总共只会启动2个executor,每个对应10个core。而在第一种而言,会启动20个executor,每个对应1个core。
接下来就看worker原理了。。。。