spark

Spark架构师3-spark初始化和任务启动源码

2020-10-22  本文已影响0人  fat32jin

1、上次总结 spark初始化环境资源 0:18:00~ 0:41:00

1、Spark RPC(Endpoint:DriverEndpoint ClientEndpoint)
2、利用 akka(endpoint类似于actor) 模拟实现 YARN(Flink 就是基于 akka实现的 RPC)
3、Spark Standalone 集群启动脚本start-all.sh 分析
4、Master 启动分析
5、Worker 启动分析
6、Spark APP 提交脚本 spark-submit 脚本分析
7、SparkSubmit 分析(重点是进入main方法,通过反射的方式运行用户编写的application的主类main方法)

8、SparkContext 初始化

1.1 SparkContext 初始化 分析 0:10~ 0:41

Spark任务执行流程分析.jpg

示例程序demo sparkPI
val spark: SparkSession = SparkSession.builder.appName("Spark Pi").getOrCreate()

注: 上图粉红色半边 ,7件事

0:18:16 代码开始

 ——》SparkSession#Builder#getOrCreate()
                val sparkContext: SparkContext = userSuppliedContext.getOrElse{
                     SparkContext.getOrCreate(sparkConf)
         ——》 SparkContext#getOrCreate()
                   setActiveContext(new SparkContext(config), allowMultipleContexts = false)                
                   ——》SparkContext类块#try
                            //以下为类级代码块 
                           try {
                                       _conf = config.clone()
                                       _conf.validateSettings()
 ############# 注释:第一步   创建Spark Env############         
                                * 
                               *  除了创建 sparkEnv之外,还创建了各种 manager 对象。                                
                                */ Create the Spark execution environment (cache, map output tracker, etc)
                                   _env = createSparkEnv(_conf, isLocal, listenerBus)
                          
                                ——》SparkEnv#createDriverEnv
                                    ——》SparkEnv#create
                                               //注释: 初始化 SecurityManager
                                          val securityManager = new SecurityManager(conf, ioEncryptionKey)                                              
                                                   //注释: 初始化 NettyRpcEnv
                                           val systemName = if (isDriver) driverSystemName  
                                           val rpcEnv = RpcEnv.create(systemName, 
                                                       bindAddress, advertiseAddress, 
                                                        port.getOrElse(-1), conf, securityManager,
                                                         numUsableCores, !isDriver)
                                         //注释: 初始化 SerializerManager   
        val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)      
                                    //注释: 初始化 BroadcastManager      
        val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
                                //注释: 初始化 MapOutputTracker 
        val mapOutputTracker = if (isDriver) {
            new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
                               //注释: 初始化 SortShuffleManager    
        val shortShuffleMgrNames = Map("sort" ->  
                               // 注释: 初始化 UnifiedMemoryManager  统一内存管理模型
                                // StaticMemoryManaager  静态内存管理模型
        val memoryManager: MemoryManager = if (useLegacyMemoryManager) 
                                //注释: 初始化 BlockManagerMaster 
        val blockManagerMaster = new BlockManagerMaster(
                                //注释: 初始化 BlockManager 
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,  
                               //注释: 初始化 OutputCommitCoordinator 
        val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse 
                               //注释: 正式初始化SparkEnv
        val envInstance = new SparkEnv(executorId, rpcEnv, serializer, closureSerializer,
         return   envInstance

 ############# 注释:第二步   创建SparkUI############         
/**   注释:第二步
         *  创建并初始化Spark UI
         */
        _ui = if (conf.getBoolean("spark.ui.enabled", true)) {
            // TODO_MA 注释:_jobProgressListener跟踪要在UI中显示的任务级别信息,startTime就是SparkContext的初始时的系统时间
            // TODO_MA 注释:返回SparkUI,它的父类是WebUI,和MasterWebUI是一个级别的
            Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime))
        } else {
            // For tests, do not enable the UI
            None
        }    
 ############# 注释:第三步   hadoop相关配置以及Executor环境变量############    
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
if (jars != null) {
            jars.foreach(addJar)
        }
        
        if (files != null) {
            files.foreach(addFile)
        }

 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
        executorEnvs ++= _conf.getExecutorEnv
        executorEnvs("SPARK_USER") = sparkUser

 ############# 注释: 第四步:创建心跳接收器 ############
            *  1、我们需要在“createTaskScheduler”之前注册“HeartbeatReceiver”,因为Executor将在构造函数中检索“HeartbeatReceiver”
         *  2、创建一个HeartbeatReceiver 的RpcEndpoint注册到RpcEnv中,每分钟给自己发送ExpireDeadHosts,去检测Executor是否存在心跳,
         *  3、如果当前时间减去最一次心跳时间,大于1分钟,就会用CoarseGrainedSchedulerBackend将Executor杀死
         */
        
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
 

 ############# 注释:第五步:创建任务调度TaskScheduler############           

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts

 ############# 注释:第六步:创建和启动DAGScheduler############      
     
 /*  *  1、内部初始化了一个:DAGSchedulerEventProcessLoop 用来处理各种任务
         *  2、在 DAGSchedulerEventProcessLoop 创建的时候,构造函数的内部的最后一句代码执行了 DAGSchedulerEventProcessLoop的启动。
         *  将来任务的提交,取消等,都会发送一个事件给 DAGSchedulerEventProcessLoop
         *  从而触发 dagScheduler.onRecevie() 的运行。
         */
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)



 ############# 注释:第七步:TaskScheduler的启动,backend.start()############  
 /** :第七步:TaskScheduler的启动,主要任务:backend.start()
         */
        
        _taskScheduler.start()
                                       ↓
        TaskSchedulerImpl#start()
                  .......
                               backend.start()
                                                    ↓
StandaloneSchedulerBackend.start()
                                                     ▼
/** :调用父类方法 start() 方法启动一个 DriverEndPoint
         *   super 粗粒度的 CoarseGrainedSchedulerBackend
         */
                           super.start()        
                                            ↓
                CoarseGrainedSchedulerBackend.start()
                                          ▼
                              /**  创建一个 DriverEndPoint 负责跟 master 打交道的         
                             driverEndpoint = createDriverEndpointRef(properties)

                          /**  注释:里面维护了一个 DriverEndPoint 主要用来向 Executor分发任务 
        client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
        
        /*    注释:client start  启动了 ClientEndpoint 
                        client.start()

            ——》 StandaloneAppClient.start()
                     endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
                            ——》 ClientEndpoint#Onstart()方法
                                    /** 注释:clientEndPoint 向 Master 执行注册 
                                       registerWithMaster(1)
                                                      ▼
                                       registerMasterFutures.set(tryRegisterAllMasters())
                                    ——》tryRegisterAllMasters()
                                                      ▼
                             /** 注释:创建了一个 Master 的 RPC 代理
                         */
                        val masterRef = rpcEnv.setupEndpointRef(masterAddress,  
                        
                        /**
                         *   注释:注册
                         */
                        masterRef.send(RegisterApplication(appDescription, self))
                  ——》RegisterApplication()
                                            ↓
                                Master#类 receive 方法 
                                         ...................
                                     case RegisterApplication(description, driver) => 
                                          ...................
                                     case RegisterWorker
                                                schedule()
                                                    launchDriver(worker, driver)    启动driver
                                              ——》startExecutorsOnWorkers() 
                                                                      ▼
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
                    allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
                }
                                                  ——》allocateWorkerResourceToExecutors
                                                       ——》launchExecutor(worker, exec)
                                                                                  ▼
                                                           /* 注释:发送命令让 worker 启动 executor
         *   worker 节点的一个 RPC 节点,负责通信的。
         */
        worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
        
        / 注释:发消息告诉 Driver 该 worker 上的 executor 已经启动
         */
        exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))          
                                                                           ↓
                                                       Worker类  receive 方法
                                                                      ▼            
/* 注释: 接收到 Master 发送过来的启动 Executor 的命令
     */
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => 
                
/* 注释:把启动Executor必要的一些信息,封装在 ExeuctorRunner 中
             */
            val manager = new ExecutorRunner(appId, execId, appDesc.copy(command =
                 /* : 启动好了 Executor 之后,返回给 Master一个信号。
             *   信息封装在 ExecutorStateChanged 对象中。
             */
            sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

           /* 启动 executor
             *   worker.send(ExecutorStateChanged)
             */
            manager.start()
                          ↓
   ——》ExecutorRunner#start
              ——》fetchAndRunExecutor()
                     /* :构建jvm  进程启动命令  */
            process = builder.start()
            val header = "Spark Executor Command: %s\n%s\n\n".format(
                           ↓   跳转到Executor类构造方法
            Executor # 
                        // 实际上是构建了一个用于执行task的线程池
    private val threadPool = {
        val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Executor task launch worker-%d")
            .setThreadFactory(new ThreadFactory {

2、 本次内容概述 spark 任务提交 分析 0:41:00 ~

1、Spark Application 提交流程分析
2、Spark Application 的 DAG 生成和 Stage 切分分析
3、Spark 的 Task 分发和执行源码分析
4、Spark 的 Shuffle 机制源码分析

2.1 Spark Application 提交流程分析 0:43 ~0:58

入口:spark application 中的 action 算子!(SparkPi 程序中的 reduce 函数)
以 SparkPi 程序举例:reduce() 算子就是提交 job 的入口

reduce()

sc.runJob
——》SparkContext#runJob
——》DAGScheduler#runJob

/*
* 1、应用程序调用action算子
* 2、sparkcontext。runjob
* 3、dagscheduler。runjob
* 4、taskscheduler。submittasks
* 5、schedulerbackend。driverEndpoint 提交任务
/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
——》DAGScheduler#runJob

/
* 注释: 提交任务
* 参数解析:
* 1、rdd:要在其上运行任务的参数RDD目标RDD
* 2、func:在RDD的每个分区上运行的函数
* 3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。
* 4、callSite:在用户程序中调用此作业的位置
* 5、resultHandler:回调函数,以将每个分区结果传递给Xxx
* 6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
*/
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

——》DAGScheduler#runJob

/**
* 第一步:封装一个JobWaiter对象;
* 第二步:将JobWaiter对象赋值给JobSubmitted的listener属性,
* 并将JobSubmitted(DAGSchedulerEvent事件)对象传递给eventProcessLoop事件循环处理器。eventProcessLoop
* 内部事件消息处理线程将会接收JobSubmitted事件,并调用dagScheduler.handleJobSubmitted(...)方法来处理事件;
* 第三步:返回JobWaiter对象。
*/
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

    /**         
     *   注释:这是提交任务运行
     *   eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop
     *   这个组件主要负责:任务的提交执行
     */
    eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))

——》 eventProcessLoop 构造函数 DAGSchedulerEventProcessLoop
——》EventLoop#post

   ——》DAGSchedulerEventProcessLoop#onRecive   
        ——》DAGSchedulerEventProcessLoop#doOnRecive
          ——》DAGSchedulerEventProcessLoop#handleJobSubmitted 

从此,任务的提交就交给了 dagScheduler#handleJobSubmitted 方法

/* 注释: RDD DAG划分Stages:Stage的划分是从最后一个Stage开始逆推的,

2.2 Spark Application 的 DAG 生成和 Stage 切分分析 0:58~ 1:46

入口:EventLoop 中的 eventQueue.take() 方法

如果任务提交,则有 JobSubmitted 事件提交到 eventQueue 中,则 eventQueue.take() 阻塞返回,此时的 event 就是 JobSubmitted。
根据事件机制,跳转到:DAGScheduler.handleJobSubmitted() 方法
根据 driver 发送过来的 事件类型,来决定到底做什么!

两个核心的方法:

// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage执行入口
submitStage(finalStage)

方法依赖关系:
1、createResultStage(传入finalRDD获得ResultStage) ->2
2、getOrCreateParentStages(传入rdd获得父stage) ->3->4
3、getShuffleDependencies(传入rdd获得宽依赖)
4、getOrCreateShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->5->6
5、getMissingAncestorShuffleDependencies(传入一个rdd获得所有宽依赖) ->3
6、createShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->2

image.png
image.png

RDD任务切分中间分为:Application、Job、Stage 和 Task
1、Application:初始化一个 SparkContext 即生成一个 Application;
2、Job:一个 Action 算子就会生成一个 Job;
3、Stage:Stage 等于宽依赖的个数加 1;
4、Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系

dagScheduler#handleJobSubmitted 主方法

一 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)


// TODO_MA 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
val parents = getOrCreateParentStages(rdd, jobId)

image.png
image.png


1 getShuffleDependencies(rdd).map { shuffleDep => 2 getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList

                  1 ——》DAGScheduler#getShuffleDependencies   
                                               ▼
              /**  重点方法  找RDD依赖链 
              * Returns shuffle dependencies that are immediate parents of the given RDD.
              * This function will not return more distant ancestors.
              * For example, if C has a shuffle dependency on B which has a shuffle dependency on A:
               * A <-- B <-- C
                 * calling this function with rdd C will only return the B <-- C dependency.
               * This function is scheduler-visible for the purpose of unit testing.
                * TODO_  采用的是深度优先遍历找到Action算子的父依赖中的宽依赖
                  */
        
                         2 ——》   getOrCreateShuffleMapStage
               /* TODO_MA 如果shuffleIdToMapStage中存在shuffle,则获取shuffle map stage。
                  *  否则,如果shuffle map stage不存在,该方法将创建shuffle map stage
                  *   以及任何丢失的parent shuffle map stage。
   ***************************************

二 、 /* 注释: 递归提交 Stage /
submitStage(finalStage)
——>submitMissingTasks()

/
注释: 把 stage 变成 Tasks
* Step3: 为每个需要计算的partiton生成一个task
*/
val tasks: Seq[Task[_]] = try {
//如果是 ShuffleMapStage 阶段的 Task,则构建 ShuffleMapTask
case stage: ShuffleMapStage => stage.pendingPartitions.clear()
.............
//如果是 ResultStage 阶段的 Task,则构建 ResultTask
case stage: ResultStage => partitionsToCompute.map

   /* 注释: 如果该阶段有 Task 需要执行
     *   Step4: 提交tasks
     */
    if (tasks.size > 0) {          
           //注释: taskScheduler 的具体类型是:TaskSchedulerImpl
          taskScheduler.submitTasks(new TaskSet(tasks.toArray, 

2.3 Spark Task 分发和执行分析 2:13 ~ 2:45

入口接上面的: taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id,
taskScheduler 的具体类型是:TaskSchedulerImpl

TaskSchedulerImpl#submitTasks 方法

/** 注释:TaskScheduler提交job,最后交由 SchedulerBackEnd 进行提交
*/
backend.reviveOffers()

CoarseGrainedSchedulerBackend.reviveOffers()

// 给 DriverEndpoint (自己 )发送 ReviveOffers 消息
driverEndpoint.send(ReviveOffers)

CoarseGrainedSchedulerBackend#DriverEndpoint#receive

case ReviveOffers => makeOffers()

scheduler.resourceOffers(workOffers) //申请计算资源
.........
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)

// 注释: 发送 LaunchTask 消息给:CoarseGrainedExecutorBackend 类
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

CoarseGrainedExecutorBackend# receive()
▼ //data 是任务的反序列化
case LaunchTask(data) => if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else
{ ....... executor.launchTask(this, taskDesc)
}

Executor#launchTask

// TODO_MA 注释: 封装一个 TaskRunner 线程对象,来运行一个 Task
val tr = new TaskRunner(context, taskDescription)


writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get


2.3 Spark Suffle 源码分析 2:45~ 3:05
HashShuffleManager的运行原理 2:45~ 2:52
SortShuffleManager运行原理 2:52 ~3:13

                                       ↓   有4个writer ; 以SortShuffleWriter为例
               SortShuffleWriter#write
                                      ▼ 
                *   注释: 先排序 
    sorter = if (dep.mapSideCombine) {
                插入数据到排序区
    sorter.insertAll(records)          

val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

     val tmp = Utils.tempFileWith(output)

val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)



                                              ↓     Result
                       2.     ResultMapTask#runTask()
                                               ▼ 
image.png image.png
上一篇下一篇

猜你喜欢

热点阅读