Spark架构师3-spark初始化和任务启动源码
1、上次总结 spark初始化环境资源 0:18:00~ 0:41:00
1、Spark RPC(Endpoint:DriverEndpoint ClientEndpoint)
2、利用 akka(endpoint类似于actor) 模拟实现 YARN(Flink 就是基于 akka实现的 RPC)
3、Spark Standalone 集群启动脚本start-all.sh 分析
4、Master 启动分析
5、Worker 启动分析
6、Spark APP 提交脚本 spark-submit 脚本分析
7、SparkSubmit 分析(重点是进入main方法,通过反射的方式运行用户编写的application的主类main方法)
8、SparkContext 初始化
1.1 SparkContext 初始化 分析 0:10~ 0:41
Spark任务执行流程分析.jpg示例程序demo sparkPI
val spark: SparkSession = SparkSession.builder.appName("Spark Pi").getOrCreate()
注: 上图粉红色半边 ,7件事
0:18:16 代码开始
——》SparkSession#Builder#getOrCreate()
val sparkContext: SparkContext = userSuppliedContext.getOrElse{
SparkContext.getOrCreate(sparkConf)
——》 SparkContext#getOrCreate()
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
——》SparkContext类块#try
//以下为类级代码块
try {
_conf = config.clone()
_conf.validateSettings()
############# 注释:第一步 创建Spark Env############
*
* 除了创建 sparkEnv之外,还创建了各种 manager 对象。
*/ Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
——》SparkEnv#createDriverEnv
——》SparkEnv#create
//注释: 初始化 SecurityManager
val securityManager = new SecurityManager(conf, ioEncryptionKey)
//注释: 初始化 NettyRpcEnv
val systemName = if (isDriver) driverSystemName
val rpcEnv = RpcEnv.create(systemName,
bindAddress, advertiseAddress,
port.getOrElse(-1), conf, securityManager,
numUsableCores, !isDriver)
//注释: 初始化 SerializerManager
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
//注释: 初始化 BroadcastManager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
//注释: 初始化 MapOutputTracker
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
//注释: 初始化 SortShuffleManager
val shortShuffleMgrNames = Map("sort" ->
// 注释: 初始化 UnifiedMemoryManager 统一内存管理模型
// StaticMemoryManaager 静态内存管理模型
val memoryManager: MemoryManager = if (useLegacyMemoryManager)
//注释: 初始化 BlockManagerMaster
val blockManagerMaster = new BlockManagerMaster(
//注释: 初始化 BlockManager
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
//注释: 初始化 OutputCommitCoordinator
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse
//注释: 正式初始化SparkEnv
val envInstance = new SparkEnv(executorId, rpcEnv, serializer, closureSerializer,
return envInstance
############# 注释:第二步 创建SparkUI############
/** 注释:第二步
* 创建并初始化Spark UI
*/
_ui = if (conf.getBoolean("spark.ui.enabled", true)) {
// TODO_MA 注释:_jobProgressListener跟踪要在UI中显示的任务级别信息,startTime就是SparkContext的初始时的系统时间
// TODO_MA 注释:返回SparkUI,它的父类是WebUI,和MasterWebUI是一个级别的
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime))
} else {
// For tests, do not enable the UI
None
}
############# 注释:第三步 hadoop相关配置以及Executor环境变量############
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= _conf.getExecutorEnv
executorEnvs("SPARK_USER") = sparkUser
############# 注释: 第四步:创建心跳接收器 ############
* 1、我们需要在“createTaskScheduler”之前注册“HeartbeatReceiver”,因为Executor将在构造函数中检索“HeartbeatReceiver”
* 2、创建一个HeartbeatReceiver 的RpcEndpoint注册到RpcEnv中,每分钟给自己发送ExpireDeadHosts,去检测Executor是否存在心跳,
* 3、如果当前时间减去最一次心跳时间,大于1分钟,就会用CoarseGrainedSchedulerBackend将Executor杀死
*/
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
############# 注释:第五步:创建任务调度TaskScheduler############
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
############# 注释:第六步:创建和启动DAGScheduler############
/* * 1、内部初始化了一个:DAGSchedulerEventProcessLoop 用来处理各种任务
* 2、在 DAGSchedulerEventProcessLoop 创建的时候,构造函数的内部的最后一句代码执行了 DAGSchedulerEventProcessLoop的启动。
* 将来任务的提交,取消等,都会发送一个事件给 DAGSchedulerEventProcessLoop
* 从而触发 dagScheduler.onRecevie() 的运行。
*/
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
############# 注释:第七步:TaskScheduler的启动,backend.start()############
/** :第七步:TaskScheduler的启动,主要任务:backend.start()
*/
_taskScheduler.start()
↓
TaskSchedulerImpl#start()
.......
backend.start()
↓
StandaloneSchedulerBackend.start()
▼
/** :调用父类方法 start() 方法启动一个 DriverEndPoint
* super 粗粒度的 CoarseGrainedSchedulerBackend
*/
super.start()
↓
CoarseGrainedSchedulerBackend.start()
▼
/** 创建一个 DriverEndPoint 负责跟 master 打交道的
driverEndpoint = createDriverEndpointRef(properties)
/** 注释:里面维护了一个 DriverEndPoint 主要用来向 Executor分发任务
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
/* 注释:client start 启动了 ClientEndpoint
client.start()
——》 StandaloneAppClient.start()
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
——》 ClientEndpoint#Onstart()方法
/** 注释:clientEndPoint 向 Master 执行注册
registerWithMaster(1)
▼
registerMasterFutures.set(tryRegisterAllMasters())
——》tryRegisterAllMasters()
▼
/** 注释:创建了一个 Master 的 RPC 代理
*/
val masterRef = rpcEnv.setupEndpointRef(masterAddress,
/**
* 注释:注册
*/
masterRef.send(RegisterApplication(appDescription, self))
——》RegisterApplication()
↓
Master#类 receive 方法
...................
case RegisterApplication(description, driver) =>
...................
case RegisterWorker
schedule()
launchDriver(worker, driver) 启动driver
——》startExecutorsOnWorkers()
▼
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
——》allocateWorkerResourceToExecutors
——》launchExecutor(worker, exec)
▼
/* 注释:发送命令让 worker 启动 executor
* worker 节点的一个 RPC 节点,负责通信的。
*/
worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
/ 注释:发消息告诉 Driver 该 worker 上的 executor 已经启动
*/
exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
↓
Worker类 receive 方法
▼
/* 注释: 接收到 Master 发送过来的启动 Executor 的命令
*/
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
/* 注释:把启动Executor必要的一些信息,封装在 ExeuctorRunner 中
*/
val manager = new ExecutorRunner(appId, execId, appDesc.copy(command =
/* : 启动好了 Executor 之后,返回给 Master一个信号。
* 信息封装在 ExecutorStateChanged 对象中。
*/
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
/* 启动 executor
* worker.send(ExecutorStateChanged)
*/
manager.start()
↓
——》ExecutorRunner#start
——》fetchAndRunExecutor()
/* :构建jvm 进程启动命令 */
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
↓ 跳转到Executor类构造方法
Executor #
// 实际上是构建了一个用于执行task的线程池
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
2、 本次内容概述 spark 任务提交 分析 0:41:00 ~
1、Spark Application 提交流程分析
2、Spark Application 的 DAG 生成和 Stage 切分分析
3、Spark 的 Task 分发和执行源码分析
4、Spark 的 Shuffle 机制源码分析
2.1 Spark Application 提交流程分析 0:43 ~0:58
入口:spark application 中的 action 算子!(SparkPi 程序中的 reduce 函数)
以 SparkPi 程序举例:reduce() 算子就是提交 job 的入口
reduce()
▼
sc.runJob
——》SparkContext#runJob
——》DAGScheduler#runJob
▼
/*
* 1、应用程序调用action算子
* 2、sparkcontext。runjob
* 3、dagscheduler。runjob
* 4、taskscheduler。submittasks
* 5、schedulerbackend。driverEndpoint 提交任务
/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
——》DAGScheduler#runJob
▼
/* 注释: 提交任务
* 参数解析:
* 1、rdd:要在其上运行任务的参数RDD目标RDD
* 2、func:在RDD的每个分区上运行的函数
* 3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。
* 4、callSite:在用户程序中调用此作业的位置
* 5、resultHandler:回调函数,以将每个分区结果传递给Xxx
* 6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
*/
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
——》DAGScheduler#runJob
▼
/**
* 第一步:封装一个JobWaiter对象;
* 第二步:将JobWaiter对象赋值给JobSubmitted的listener属性,
* 并将JobSubmitted(DAGSchedulerEvent事件)对象传递给eventProcessLoop事件循环处理器。eventProcessLoop
* 内部事件消息处理线程将会接收JobSubmitted事件,并调用dagScheduler.handleJobSubmitted(...)方法来处理事件;
* 第三步:返回JobWaiter对象。
*/
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
/**
* 注释:这是提交任务运行
* eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop
* 这个组件主要负责:任务的提交执行
*/
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
——》 eventProcessLoop 构造函数 DAGSchedulerEventProcessLoop
——》EventLoop#post
——》DAGSchedulerEventProcessLoop#onRecive
——》DAGSchedulerEventProcessLoop#doOnRecive
——》DAGSchedulerEventProcessLoop#handleJobSubmitted
从此,任务的提交就交给了 dagScheduler#handleJobSubmitted 方法
▼
/* 注释: RDD DAG划分Stages:Stage的划分是从最后一个Stage开始逆推的,
-
每遇到一个宽依赖处,就分裂成另外一个Stage
- 依此类推直到Stage划分完毕为止。并且,只有最后一个Stage的类型是ResultStage类型。
- 注意Dataset、DataFrame、sparkSession.sql("select ...")
- 经过catalyst代码解析会将代码转化为RDD
-
做了2件最主要的事
-
1、stage切分
-
2、 stage 提交
/
/ 注释: Stage 切分
* 这个 finalRDD 就是 rdd链条中的最后一个 RDD,也就是触发 sc.runJob() 方法执行的 RDD
*/finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) /* * 注释: 提交 Stage */ submitStage(finalStage)
2.2 Spark Application 的 DAG 生成和 Stage 切分分析 0:58~ 1:46
入口:EventLoop 中的 eventQueue.take() 方法
如果任务提交,则有 JobSubmitted 事件提交到 eventQueue 中,则 eventQueue.take() 阻塞返回,此时的 event 就是 JobSubmitted。
根据事件机制,跳转到:DAGScheduler.handleJobSubmitted() 方法
根据 driver 发送过来的 事件类型,来决定到底做什么!
两个核心的方法:
// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage执行入口
submitStage(finalStage)
方法依赖关系:
1、createResultStage(传入finalRDD获得ResultStage) ->2
2、getOrCreateParentStages(传入rdd获得父stage) ->3->4
3、getShuffleDependencies(传入rdd获得宽依赖)
4、getOrCreateShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->5->6
5、getMissingAncestorShuffleDependencies(传入一个rdd获得所有宽依赖) ->3
6、createShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->2
image.png
RDD任务切分中间分为:Application、Job、Stage 和 Task
1、Application:初始化一个 SparkContext 即生成一个 Application;
2、Job:一个 Action 算子就会生成一个 Job;
3、Stage:Stage 等于宽依赖的个数加 1;
4、Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系
dagScheduler#handleJobSubmitted 主方法
▼
一 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
▼
// TODO_MA 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
val parents = getOrCreateParentStages(rdd, jobId)
image.png
▼
1 getShuffleDependencies(rdd).map { shuffleDep => 2 getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList
1 ——》DAGScheduler#getShuffleDependencies
▼
/** 重点方法 找RDD依赖链
* Returns shuffle dependencies that are immediate parents of the given RDD.
* This function will not return more distant ancestors.
* For example, if C has a shuffle dependency on B which has a shuffle dependency on A:
* A <-- B <-- C
* calling this function with rdd C will only return the B <-- C dependency.
* This function is scheduler-visible for the purpose of unit testing.
* TODO_ 采用的是深度优先遍历找到Action算子的父依赖中的宽依赖
*/
2 ——》 getOrCreateShuffleMapStage
/* TODO_MA 如果shuffleIdToMapStage中存在shuffle,则获取shuffle map stage。
* 否则,如果shuffle map stage不存在,该方法将创建shuffle map stage
* 以及任何丢失的parent shuffle map stage。
***************************************
二 、 /* 注释: 递归提交 Stage /
submitStage(finalStage)
——>submitMissingTasks()
▼
/ 注释: 把 stage 变成 Tasks
* Step3: 为每个需要计算的partiton生成一个task
*/
val tasks: Seq[Task[_]] = try {
//如果是 ShuffleMapStage 阶段的 Task,则构建 ShuffleMapTask
case stage: ShuffleMapStage => stage.pendingPartitions.clear()
.............
//如果是 ResultStage 阶段的 Task,则构建 ResultTask
case stage: ResultStage => partitionsToCompute.map
/* 注释: 如果该阶段有 Task 需要执行
* Step4: 提交tasks
*/
if (tasks.size > 0) {
//注释: taskScheduler 的具体类型是:TaskSchedulerImpl
taskScheduler.submitTasks(new TaskSet(tasks.toArray,
2.3 Spark Task 分发和执行分析 2:13 ~ 2:45
入口接上面的: taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id,
taskScheduler 的具体类型是:TaskSchedulerImpl
TaskSchedulerImpl#submitTasks 方法
▼
/** 注释:TaskScheduler提交job,最后交由 SchedulerBackEnd 进行提交
*/
backend.reviveOffers()
↓
CoarseGrainedSchedulerBackend.reviveOffers()
▼
// 给 DriverEndpoint (自己 )发送 ReviveOffers 消息
driverEndpoint.send(ReviveOffers)
↓
CoarseGrainedSchedulerBackend#DriverEndpoint#receive
▼
case ReviveOffers => makeOffers()
▼
scheduler.resourceOffers(workOffers) //申请计算资源
.........
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
▼
// 注释: 发送 LaunchTask 消息给:CoarseGrainedExecutorBackend 类
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
↓
CoarseGrainedExecutorBackend# receive()
▼ //data 是任务的反序列化
case LaunchTask(data) => if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else
{ ....... executor.launchTask(this, taskDesc)
}
↓
Executor#launchTask
▼
// TODO_MA 注释: 封装一个 TaskRunner 线程对象,来运行一个 Task
val tr = new TaskRunner(context, taskDescription)
- 注释: 提交到线程池运行,那么就转到 TaskRunner 的 run() 方法
*/
threadPool.execute(tr)
↓ 具体运行一个 Task 的地方
Executor#TaskRunner#run()
▼
val value = Utils.tryWithSafeFinally {
val res = task.run(taskAttemptId = taskId,
↓
Task#run()//
▼
runTask(context)// 两种task , 一种是shuffer ,一种是result
↓shuffer
1. ShuffleMapTask#runTask()
▼
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
2.3 Spark Suffle 源码分析 2:45~ 3:05
HashShuffleManager的运行原理 2:45~ 2:52
SortShuffleManager运行原理 2:52 ~3:13
↓ 有4个writer ; 以SortShuffleWriter为例
SortShuffleWriter#write
▼
* 注释: 先排序
sorter = if (dep.mapSideCombine) {
插入数据到排序区
sorter.insertAll(records)
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
↓ Result
2. ResultMapTask#runTask()
▼
image.png
image.png