Spark学习之路大数据

spark底层源码解析:

2019-01-09  本文已影响67人  Yellow_0ce3

这是本人第一次发表技术帖,借鉴了很多大神的文章和自己的一些拙见,有什么不正确的大家可以指出来,共同进步

Spark底层RPC通信:记住这里是以事件进行驱动的!!!!
三个主要的类:

RpcEndpoint:是一个通信端,例如Spark集群中的Master,或Worker,都是一个RpcEndpoint.

RpcEndpointRef:RPCEndPoint的引用,我们想要和RPCEndpoint通信的话就必须要获得它的引用.

RpcEnv:是RPC通信的框架和环境,有RPC的启动,停止,关闭等方法,它有一个setupEndPoint方法,用来注册一个RPCEndPoint,同时将RpcEndpointRef和RpcEndpoint以键值对的形式存放在线程安全的ConcurrentHashMap里面

spark的启动消息通信

(代码不会精细的分析,只会提一下比较重要的方法,和主要实现的功能)

主要的组件:

  1. master端:
def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    #val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))#
    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
} 

在master方法的main方法里面首先就是去调用上面的方法,开始构建通信环境,通过RPCEnv获取的自己的RPCEndpoinRef,这是master的启动

  1. worker端:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    masterRpcAddresses.map { masterAddress =>
      registerMasterThreadPool.submit(new Runnable {
        override def run(): Unit = {
          try {
            logInfo("Connecting to master " + masterAddress + "...")
            //通过master的信息生成MasterRpcEndPoint  像master发送注册信息
            val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            //这里就是发送注册信息的代码
            sendRegisterMessageToMaster(masterEndpoint)
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        }
      })
    }
  }

在worker端的 tryRegisterAllMasters()方法会尝试向Masters(因为可能会配置HA)进行注册
我们之前提到过要向RpcEndpoint通信必须要获取到它的RpcEndpointRef,所以
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)这个方法就是获取master的RpcEndpointRef接着通过* sendRegisterMessageToMaster(masterEndpoint)这个方法发送消息在spark底层通信大量用到的模式匹配类进行的通信*

之后要是代码里面方法有详细的注释的话就不会单独的拿出来讲了,只是会提一下应该注意的点,同时这里的1,2,3就是代码分析执行的流程,比如接下来就到master端了

3.master端

case RegisterWorker(
      id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      //判断各种状态
      if (state == RecoveryState.STANDBY) {
        workerRef.send(MasterInStandby)
      } else if (idToWorker.contains(id)) {
        //发送的是注册失败的消息
        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerWebUiUrl)
        //把这个worker对象里面的一系列信息保存到自己的内部,返回值是一个Boolean类型的 避免重复注册
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          //像worker返回一个注册成功的消息 这里的self就相当于 自身的RpcEndPoint 
          workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
          schedule()
        } else {
          val workerAddress = worker.endpoint.address
          //打印日志信息
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          //发送失败信息
          workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }

这段代码主要就是master端对woker端发送过来的注册消息进行处理的逻辑.registerWorker(worker)这个方法会将worker信息加入到自己内部的列表里面后面会用于集群的任务调度.
4.回到了woker端

 case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
        if (preferConfiguredMasterAddress) {
          logInfo("Successfully registered with master " + masterAddress.toSparkURL)
        } else {
          logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        }
        //改变注册状态
        registered = true
        //同时更新自己内部维护的master的信息
        changeMaster(masterRef, masterWebUiUrl, masterAddress)
        forwordMessageScheduler.scheduleAtFixedRate(
         
          new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            //这里是自己给自己发送消息,请求向master发送心跳信息
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
        //又创建了一个线程 去执行删除目录的任务
        if (CLEANUP_ENABLED) {
          logInfo(
            s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(WorkDirCleanup)
            }
          }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
        }
       //执行关于executor的代码
        val execs = executors.values.map { e =>
          new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
        }
        //发送一些任务信息
        masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))

上面一些这里暂时不关注的方法给了简单的注释,这里就不再解释了,我们这里关注的是* self.send(SendHeartbeat)*这个方法,我们再往里面走!

 case SendHeartbeat =>
      if (connected) { sendToMaster(Heartbeat(workerId, self)) }

这里就开始真正的向master发送"心跳"所谓的"心跳机制"就是每隔一段时间(这个时间间隔我们可以自己设置)向master返回自己最新的状态信息,而master端的处理我们看下面的代码
5.master端

    case Heartbeat(workerId, worker) =>
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          //更新最后一次心跳时间
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          //这下面就是打印些错误信息
          if (workers.map(_.id).contains(workerId)) {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " Asking it to re-register.")
            worker.send(ReconnectWorker(masterUrl))
          } else {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " This worker was never registered, so ignoring the heartbeat.")
          }
      }

workerInfo.lastHeartbeat = System.currentTimeMillis()这段代码很重要!!它会通过发送过来wokerId去更新之前注册在里面的workInfo信息中的lastHeartbeat, 而这里它是怎么做到检测的呢,原因是master它自己有一个超时检测机制,会在我们设置的时间的间隔内去检测workInfo列表的最后一次心跳时间,要是超过我们设置的时间就代表这个worker已经"挂掉了",这里超时检测机制的代码就不去看了,有兴趣的小伙伴可以自己去看看,在此spark的启动消息通信就已经分析完了,这里只是简单的分析,并且借鉴了很多大神的资料.有问题请小伙伴及时纠正哦

Spark运行时的消息通信

牵扯到的几个主要的RPCEndPoint

  1. Worker :
    Executor :
  2. Master :
下面两者在SparkContext中创建出来的
  1. DriverEndpoint :负责和executor进行通信 ,真正创建的是CoarseGrainedScheduler的EndPointRef
    在创建 CoarseGrainedSchedulerBackend中创建
    ENDPOINT_NAME的值是"CoarseGrainedScheduler"
 rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  1. ClientEndpoint : 负责和master进行通信,真正 创建的是StandaloneAppClient的EndPointRef.但是它到Master端的变量名为driver (我也很困惑...为什么这样叫)
    在StandaloneAppClient中创建:
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

这块在spark中的命名真的有点懵逼.....

简单的流程
当用户提交程序的时候,SparkContext会向Master发送注册消息接着Master通知Woker启动Executor执行这个App,当executor启动成功就会,executor会向Sparkcontext的DriverEndPoint进行通信,向SparkContext进行注册,接着Rdd的Action算子触发,将创建RDD的DAG,通过DAGSheduler进行stage的划分,接着生成TaskSet,有TaskSheduler像Executor发送执行消息

首先明确一个点: 在我们提交APP的时候SparkContext会做哪些事情?(在这里是将他们进行了初始化,当Action算子的执行的时候他们才会真正的执行,这一点要记住哦!)

  1. 创建SparkEvn: 用来创建DriverEndpoint和ClientEndpoint.
  2. 创建TaskScheduler:
  3. 创建DAGScheduler:

1.SparkContext端:

一:创建SparkEnv:

这里就是创建RPCEnv的
  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  }

在段代码创建了SparkEnv(在它里面有个属性private[spark] val rpcEnv: RpcEnv),有了它之后我们就可以创建我们上面提到的在SparkContext端的两个RPCEndpoint对象了.

二:创建TaskScheduler和DAGScheduler以及后端调度器

  // Create and start the scheduler
 //返回一个后端调度器和taskScheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // 开始执行
    _taskScheduler.start()

这句代码SparkContext.createTaskScheduler(this, master, deployMode)就是创建TaskSheduler的入口,它会返回一个后端调度器和taskScheduler
_dagScheduler = new DAGScheduler(this)这里是用SparkContext去创建DAGScheduler对象
我们重点关注的是通信所以我们现在要知道怎么创建DriverEndpoint和ClientEndpoint.它们是在后端调度器的启动方法里面会创建这个两个对象
SparkContext.createTaskScheduler(this, master, deployMode)这个方法往里走:

 //创建后端调度器,在new后端调度器的同时会去创造DriverEndp和ClientEndPoint
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

再往后端调度器里面看,这个时候就来到了StandaloneSchedulerBackend这个类它继承了"CoarseGrainedSchedulerBackend"在它的start()方法里面调用了父类的start()方法,在CoarseGrainedSchedulerBackend的start方法里面

  // TODO (prashant) send conf instead of properties
  //创建DriverEndpoint
    driverEndpoint = createDriverEndpointRef(properties)

接着在StandaloneSchedulerBackend这个类里面创建StandaloneAppClient的客户端

    /**
      * 创建StandaloneAppClient的客户端  同时启动
      */
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

创建StandaloneAppClient的客户端所以这里我们的ClientEndpoint是StandaloneApp模式的

三:StandaloneAppClient和Master端进行通信

  1. SparkContext端standaloneAppClient这个类里面
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        //在线程池里面启动向Master注册的消息
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

这段代码就类似与worker向master注册这里就不再赘述了

  1. Master端
    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        //把客户端的信息保存到下来
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        /**
          *  这个方法很重要!!!表示要开始资源调度了!!!!!
          */
        schedule()
      }

master端收到消息后将信息保存下来,同时执行一个很重要的方法schedule,它表示master要开始资源调度了!!!!!!!!!!!我们现在只关注它里面的一个主要的方法startExecutorsOnWorkers()这个方法用来通知Worker启动executor

  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for (app <- waitingApps if app.coresLeft > 0) {
      //获取的是任务执行的时候,分配在executor上core数
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don't have enough resources to launch an executor
      //找出能够执行程序的worker 接着对它们进行排序
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse

      //获得被分配的核数
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // 现在我们已经决定在每个worker上分配多少个内核,让我们来分配它们
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        //通知worker启动executor
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
这个方法里面会去调用一个launchExecutor(worker, exec)方法,同时将 manager上面app的状态改成RUNNING
launchExecutor方法

 private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    //向wroker端发送请求
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    //向diver端返回消息executor的信息
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

这里主要做了两件事

  1. 通知Worker启动executor 同时把executor的基本信息发送过去
  2. 同时将这些基本信息返回给driver端
在Worker主要做了什么事情呢
  1. 创建工作的目录
  2. 实例化一个ExecutorRunner
  3. 在ExecutorRunner中通过comm(它之前在SchedulerdBackend中构建的comm接着通过上述的StandaloneAppClient发送给Master再发送给Worker的)
    去创建CoarseGrainedExecutorBackend,它就牛逼了,是运行executor的容器,并且还是负责和driverEndpoint进行通信的.
  4. Worker发送executorStateChanged消息给Master通知创建完毕.
接下来我们来看代码
  case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // Create the executor's working directory
          //创建一个工作目录
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          //这个本地的工作目录会在程序执行完了之后由worker删除
          val appLocalDirs = appDirectories.getOrElse(appId, {
            val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
            val dirs = localRootDirs.flatMap { dir =>
              try {
                val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                Utils.chmod700(appDir)
                Some(appDir.getAbsolutePath())
              } catch {
                case e: IOException =>
                  logWarning(s"${e.getMessage}. Ignoring this directory.")
                  None
              }
            }.toSeq
            if (dirs.isEmpty) {
              throw new IOException("No subfolder can be created in " +
                s"${localRootDirs.mkString(",")}.")
            }
            dirs
          })
          appDirectories(appId) = appLocalDirs
          //实例化executorRunner对象
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager

          //开始启动 
          manager.start()

          coresUsed += cores_
          memoryUsed += memory_
          //告诉master executor创建完毕
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }

manager.start()这是executorRunning里面的start()方法,它里面会去调用executorRunning的fetchAndRunExecutor()方法,所以这才是我们重点需要关注的方法!!!

 private def fetchAndRunExecutor() {
    try {
      // Launch the process
      /**每个 ProcessBuilder 实例管理一个进程属性集。
         它的start() 方法利用这些属性创建一个新的 Process 实例。
         start() 方法可以从同一实例重复调用,以利用相同的或相关的属性创建新的子进程。
      **/
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $formattedCommand")

      builder.directory(executorDir)
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.getBoolean("spark.ui.reverseProxy", false)) {
          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
        } else {
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        }
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      /**
        *  这里是真正的启动,通过上面的buildProcessBuilder 方法将属性集放在里面,
        *  现在就可启动coarsegrainedExecutorBackend的main函数了
        *  启动构造器,创建CoarsegrainedExecutorBackend实例,这个是executor运行的容器
        */
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        formattedCommand, "=" * 40)

      // 输出CoarsegrainedExecutorBackend实例运行信息
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      //向wroker发型退出状态请求
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } 

不知道大家伙在这里会不会感到疑惑,我们之前不是说了在里面会创建一个CoarseGrainedExecutorBackend吗?怎么在这里还是没有见到创建它了语法
反正我当时完全懵逼,根本找不到在哪里创建,别人和我说都开始调用了,后面经过查找终于知道了原来所有的一切都在comm这个小东西上面,我们还记得在前面是不是创建StandaloneAppClient是在创建后段调度器(StandaloneSchedulerBackend)里面实现的.comm就是它里面的一个属性

    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)

这里我们可以发现它把关于CoarseGrainedExecutorBackend的一些基本信息封装进来了,接下来我们回到方法里面* val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf)拿到一个builder,它jdk里面用来创建系统进程的,我们通过把CoarseGrainedExecutorBackend*的一些基本信息封装进行,就能通过它去new CoarseGrainedExecutorBackend 了
在CoarseGrainedExecutorBackend 的启动方法里面会发生注册消息给DriverEndPoint

  override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      
      //这里就是向driver进行通信, 把executor的信息送给driver端
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))向SparkContext端发送了一个ack的响应.通过的DriverEndpointRef发送的,还记得我们之前是在什么地方创建的DriverEndpoint吗,没错!就是在CoarseGrainedSchedulerBackend这个类创建的!
回到这个方法:

  case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
        //各种条件判断
        if (executorDataMap.contains(executorId)) {
          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
          context.reply(true)
        } else if (scheduler.nodeBlacklist != null &&
          scheduler.nodeBlacklist.contains(hostname)) {
          // If the cluster manager gives us an executor on a blacklisted node (because it
          // already started allocating those resources before we informed it of our blacklist,
          // or if it ignored our blacklist), then we reject that executor immediately.
          logInfo(s"Rejecting $executorId as it has been blacklisted.")
          executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
          context.reply(true)
        } else {
          // If the executor's rpc env is not listening for incoming connections, `hostPort`
          // will be null, and the client connection should be used to contact the executor.
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          addressToExecutorId(executorAddress) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val data = new ExecutorData(executorRef, executorRef.address, hostname,
            cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
            }
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          //向executor端发送注册成功的消息
          executorRef.send(RegisteredExecutor)
          // Note: some tests expect the reply to come after we put the executor in the map
          context.reply(true)
          //添加一个监听事件,类型于master对于woker的计时器
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          //开始准备提交任务了!!!!!!!!
          makeOffers()
        }

这个方法会做什么事情呢?

  1. 向executor端返回一个注册成功的消息
  2. 添加一个注册成功的消息
  3. makeOffers()方法里面调用!!!!lun!!!!方法,这个方法太重要了,向Executor发送launchTasks消息执行任务

executor端收到注册成功的消息后

 case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        //实例化executor对象,在spark里面它才是真正的执行任务的人
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

 

主要做的事情:

  1. executor = new Executor(executorId, hostname, env, userClassPath, isLocalnew了一个executor 它才是真正执行任务的代码
  2. 在初始化Executor的时候执行startDriverHeartbeater()方法向Driver发送心跳信息 等待Driver端发送消息
  private def startDriverHeartbeater(): Unit = {
    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

    // Wait a random interval so the heartbeats don't end up in sync
    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

    val heartbeatTask = new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    }
    //向Driver发送心跳
    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
  }
}

我们还记得上面的makeOffers()launchTasks()方法不?哈哈,现在就在executor里面执行launchTask()方法,它里面new TaskRunner(context, taskDescription)的时候处理任务信息,处理完毕后发送StatusUpdate消息给CoarseGrainedExecutorBackend,接着CoarseGrainedExecutorBackend会向DriverEndpoint发送msg消息,代码如下

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    
    //向Driver端发送executor的消息
    val msg = StatusUpdate(executorId, taskId, state, data)
    
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

在Driver端的回收资源,同时给Executor分配新的任务,这些都是在makeOffer方法里面执行的哦!

至此,core的通信简单分析完了.
之后还会有Spark的内存等源码分析哦

上一篇下一篇

猜你喜欢

热点阅读