大数据

spark源码分析(2)

2018-11-14  本文已影响0人  mainroot

二、spark程序的结构

当用户提交一个saprk程序时,用户在创类SparkContext实例时,在不同部署模式下,均会最终在集群各节点上,启动java虚拟机, 通过调用CoarseGrainedExecutorBackend类的main方法启动executor,excutor会启动线程池,侦听网络端串,接收串化代码后,并启动线程执行代码。

所以,spark程序包含二部分:

1.CoarseGrainedExecutorBackend类启动分析

我们首先分析 CoarseGrainedExecutorBackend类,由于内容比较简单,直接追踪代码即可,以下做简要分析

1. main 解析参数,调用run
2. run 建立SparkEnv.createExecutorEnv
3. rpcEnv.setupEndpoint 设定 Endpoint
    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(...))
4. 调用 env.rpcEnv.awaitTermination()阻塞进程
5. createExecutorEnv 里调用 RpcEnv.create 建立  rpcEnv
6. RpcEnv支持二种串化库 akka netty 配置如下:
   "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory"
   "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory"  默认
7. NettyRpcEnv 建立 NettyRpcEnv建立ipcEnv
8.setupEndpoint中,覆盖receive
9.receive 对启动任务的处理方法
    建立Executor : case RegisteredExecutor(hostname) => executor = new Executor(...) 
    接收任务并启动: case LaunchTask(data) => executor.launchTask(...)
10. Executor.launchTask
  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }                             

2. 用户应用程序中的任务调度

这里主要分析基于Yarn Cluster模式部分的源码

主要是 CoarseGrainedSchedulerBackend、YarnClusterScheduler 类用于任务调度

1.createSparkEnv 中会建立任务调度器  
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
2.createTaskScheduler中,分析 yarn模式部分的代码
  case "yarn-standalone" | "yarn-cluster" =>
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
        } catch {...}
        val backend = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {...}
        scheduler.initialize(backend)
        (backend, scheduler)

用反射方法,生成实例,并最终调度器调用initialize初始化函数

3.YarnClusterScheduler 继承自 TaskSchedulerImpl 中定义了initialize初始化方法,定义一个任务池
 def initialize(backend: SchedulerBackend) {
    this.backend = backend                                                                                                                                                   
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }
4. SparkContext 中启动YarnClusterScheduler实例的start方法
  _taskScheduler.start() 
5. YarnClusterScheduler 的start方法继承自 TaskSchedulerImpl
 override def start() {
    //YarnClusterSchedulerBackend的初始化                                                                                                                                                     
    backend.start() 
     }
6.YarnClusterSchedulerBackend 继承 YarnSchedulerBackend 
override def start() {
    super.start()                                                                                                                                                            
    totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
  }
7. YarnSchedulerBackend 继承 CoarseGrainedSchedulerBackend
    val ENDPOINT_NAME = "CoarseGrainedScheduler"
    override def start() {
        val properties = new ArrayBuffer[(String, String)]
        for ((key, value) <- scheduler.sc.conf.getAll) {
            if (key.startsWith("spark.")) {                                                                                                                                        
                properties += ((key, value))
            }
        }
                //建立一个driverEndpoint 用于发送接收消息
        driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
                       createDriverEndpoint(properties))
    }
    protected def createDriverEndpoint(
                properties: Seq[(String, String)]
        ) : DriverEndpoint = {
        new DriverEndpoint(rpcEnv, properties)                                                                                                                                   
    }
8.  DriverEndpoint 类中有收消息函数
    override def receive: PartialFunction[Any, Unit] = {

        case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)            
            }
        }
      case ReviveOffers =>
        makeOffers()
      case KillTask(taskId, executorId, interruptThread) =>
        executorDataMap.get(executorId) match {
          case Some(executorInfo) =>
            executorInfo.executorEndpoint.send(
                    KillTask(taskId, executorId, interruptThread))
        }
    }

其中executorDataMap存放着所有可用的executor

9. makeOffers 
    private def makeOffers() {
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }   
    private def makeOffers(executorId: String) {
      if (executorIsAlive(executorId)) {
        val executorData = executorDataMap(executorId)
        val workOffers = Seq(
          new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
        launchTasks(scheduler.resourceOffers(workOffers))
      }
    }

后面,会重点关注以下几个函数:

10. WorkerOffer 简单封装
  case class WorkerOffer(executorId: String, host: String, cores: Int)
11. scheduler.resourceOffers(workOffers)
 //在类core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala中定义
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
      for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]())

    }
     //打乱excetor的顺序,随机分配任务
    val shuffledOffers = Random.shuffle(offers)
   //每个excetor有一个task数组,初始化大小为cores数组。 只是分配的内存,并加入任务描述
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    //每个executor可用核数的数组
    val availableCpus = shuffledOffers.map(o => o.cores).toArray 
    val sortedTaskSets = rootPool.getSortedTaskSetQueue //取任务集
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }
    if (tasks.size > 0) {
      hasLaunchedTask = true
    }                             
    return tasks
  }
12.resourceOfferSingleTaskSe 任务绑定
 private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
     //确定该executor的能力是否满足任务要求
      if (availableCpus(i) >= CPUS_PER_TASK) { 
        try {
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            tasks(i) += task  // 向每个executor的数组里添加任务
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            executorsByHost(host) += execId
            availableCpus(i) -= CPUS_PER_TASK  //CPU能力减弱
            launchedTask = true
          }
        } catch {... }
    }
    return launchedTask
  }

由此函数可知,--executor-cores的配置在CPUS_PER_TASK为1情况下,就是executor可并行执行的线程数

13. 该函数从任务队例中取任务,生成任务信息
 def resourceOffer(  execId: String,   host: String,    maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    if (!isZombie) {
      val curTime = clock.getTimeMillis()
      var allowedLocality = maxLocality
      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          allowedLocality = maxLocality
        }
      }
      dequeueTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality, speculative)) => {
          val task = tasks(index)
          val taskId = sched.newTaskId()
          copiesRunning(index) += 1
          val attemptNum = taskAttempts(index).size
          val info = new TaskInfo(taskId, index, attemptNum, curTime,
            execId, host, taskLocality, speculative)
          taskInfos(taskId) = info
          taskAttempts(index) = info :: taskAttempts(index)
          if (maxLocality != TaskLocality.NO_PREF) {
            currentLocalityIndex = getLocalityIndex(taskLocality)
            lastLaunchTime = curTime
          }
          val startTime = clock.getTimeMillis()
          val serializedTask: ByteBuffer = try {
            Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          } catch {... }
 if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
              !emittedTaskSizeWarning) {
            emittedTaskSizeWarning = true
          }
          addRunningTask(taskId)
          sched.dagScheduler.taskStarted(task, info)
          return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
            taskName, index, serializedTask))
        }
        case _ =>
      }
    }
    None
  }
14.launchTasks 发送任务
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        val executorData = executorDataMap(task.executorId)
        executorData.freeCores -= scheduler.CPUS_PER_TASK
        executorData.executorEndpoint.send(
              LaunchTask(
                    new SerializableBuffer(serializedTask)
               )
        )
    }

以上代码用 executorData.executorEndpoint.send 把任务发给接收端

上一篇下一篇

猜你喜欢

热点阅读