spark job 执行逻辑

2019-12-22  本文已影响0人  zachary_Luo
val rddFile = sc.textFile("...")
val rddMap = rdd.map(_.split(","))
print(rddMap.count())
//RDD类
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]  
    //初始化时传入,还有一辅助构造函数用来将rdd转Seq[Dependency[_]]。
  ) 
//RDD类中的map方法
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    // clean方法实际上调用了ClosureCleaner的clean方法,旨在清除闭包中的不能序列化的变量,防止RDD在网络传输过程中反序列化失败[1]
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
//MapPartitionsRDD类
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
extends RDD[U](prev)

说明:第一行代码执行完返回的是MapPartitionsRDD,内部其实还构建了一个HadoopRDD,MapPartitionsRDD的操作其实取value.toString。

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }
rdd_partitions.png

代码:

sparkcontext 类
def runJob[T, U: ClassTag](
      rdd: RDD[T],//map操作rdd
      func: (TaskContext, Iterator[T]) => U,//迭代器累加的函数
      partitions: Seq[Int],//partition数量
      //将分区结果res,映射到数组index位置匿名函数,
      //val results = new Array[U](partitions.size)
      //(index, res) => results(index) = res)
      resultHandler: (Int, U) => Unit): Unit = {
    val callSite = getCallSite//包含最靠近栈顶的用户类及最靠近栈底的Scala或Spark核心类信息
    val cleanedFunc = clean(func)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  }
//DAGScheduler 类  submitJob在返回waiter
def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  }
//构建JobSubmitted,并提交到事件循环处理器
def submitJob[T, U](
      rdd: RDD[T],//split操作rdd
      func: (TaskContext, Iterator[T]) => U,//迭代器累加函数
      partitions: Seq[Int],//partition数量
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,//结果处理句柄函数
      properties: Properties): JobWaiter[U] = {
    val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      Utils.cloneProperties(properties)))
    waiter
  }

划分stage,
1.createResultStage创建finalStage,在创建finalStage时,调用getorCreate获取父stage,首先依次遍历当前rdd依赖,先找到rdd所有宽依赖,再遍历这些宽依赖。
2.对宽依赖里的rdd继续深度遍历,找到当前rdd所有祖宗的宽依赖。
3.遍历2中所有宽依赖准备创建stage,创建stage时会传入上一个stage。所以会根据当前rdd重新调用getorCreate。递归终止就是父stage为空返回,创建第一个stage.然后第一个stage返回创建第二个...。
4.递归创建完后,会返回一个stage,然后根据最后一个rdd创建finalstage.

说明:每个stage包含其父stage,包含宽依赖信息,分区信息。

createStage.png

代码

//开始创建stage
 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
//创建stage前,先获取父stage.
private def createResultStage(...): ResultStage = {
    val parents = getOrCreateParentStages(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stage
  }
//1.先获取当前rdd直系父宽依赖,也就是他的爷爷宽依赖是获取不到的getShuffleDependencies。
//2.遍历依赖获取stagegetOrCreateShuffleMapStage
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }
//create依赖or Get。
//1.create时先拿到所有祖宗依赖,指深度遍历,所有宽依赖子节点,都会拿到。并遍历所有宽依赖,创建stage.
//2.根据当前宽依赖的rdd的所有祖宗依赖创建完后,会g根据当前宽依赖创建stage
private def getOrCreateShuffleMapStage(...)ShuffleMapStage={
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage
      case None =>
//拿到所有祖宗依赖,然后遍历
 getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle 
        //根据当前宽依赖创建stage
        dependency.createShuffleMapStage(shuffleDep, firstJobId)
    }
  }
//创建stage,
//1.拿到需要宽依赖中的rdd,调用getOrCreateParentStages获取父stage。然后创建ShuffleMapStage。
 def createShuffleMapStage(...): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    //先获取父stage,获取不到时,为空(第一个stage)
    val parents = getOrCreateParentStages(rdd, jobId)
 
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    stage
  }
private def submitStage(stage: Stage): Unit = {
      //更具最后一个stage找打所有stage,并按id排序
      val missing = getMissingParentStages(stage).sortBy(_.id)
        //如果没有父stage,真正提交到taskScheduler.
        if (missing.isEmpty) {
          submitMissingTasks(stage, jobId.get)
        } else {
          //找到的父stage,再次递归调用submitStage,进行查找再提交
          for (parent <- missing) {
            submitStage(parent)
          }
        }
  }
//计算每个stage里每个分区的位置偏好
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
//序列化stage中最后一个rdd,和依赖信息

//构建task,为每个分区分别构建不同类型的task
val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>           
            new ShuffleMapTask(stage.id, 
              taskBinary, part, locs, properties,...)
          }
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            new ResultTask(stage.id,
               taskBinary, part, locs, ....)
          }
      }
    } 
//最后用taskSet封装提交到taskScheduler
taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

关键代码:

//TaskSchedulerImpl类
override def submitTasks(taskSet: TaskSet): Unit = {
    val tasks = taskSet.tasks
     //创建TaskSet的Manager
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      //将Manager加入调度池。
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    //调用CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源并计算task执行位置,最后LaunchTask
    backend.reviveOffers()
  }


//TaskManager类
//创建时会将taskSet里的task按偏好位置加入各pendingTask
  addPendingTasks()
  private def addPendingTasks(): Unit = {
      for (i <- (0 until numTasks).reverse) {
        addPendingTask(i, resolveRacks = false)
      }
  }

  private[spark] def addPendingTask(
      index: Int,
      resolveRacks: Boolean = true,
      speculatable: Boolean = false): Unit = {
    val pendingTaskSetToAddTo = pendingTask(包含各本地性级别的pendingtask)
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation =>
          //task的偏好位置有execId,将task的index加入forExecutor的pendingtask
          pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
        case e: HDFSCacheTaskLocation =>
         //HDFS里也会判断task位置偏好的host是否更封装的资源有一样的host,有则拿出host主机里的executor,去查看task里是否有对应execId,有则加入forExecutor的pendingtask
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) =>
              for (e <- set) {
                pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
        case _ =>
      }
      //遍历完task的偏好位置,会将所有task加入forHost的pendingtask。表示每个task都会有host本地性级别
      pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index

      //解析机架默认为false,将机架加入pendingtask
      if (resolveRacks) {
        sched.getRackForHost(loc.host).foreach { rack =>
          pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
        }
      }
    }
    //task偏好位置为Nil在会加入noPrefs的pendingtask
    if (tasks(index).preferredLocations == Nil) {
      pendingTaskSetToAddTo.noPrefs += index
    }
   //会将所有task加入all的pendingtask。
    pendingTaskSetToAddTo.all += index
  }

3.加入调度池后 backend.reviveOffers()调用了CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源(基于事件模型的调用,reviveOffers事件有两种触发模式:1.周期性触发的,默认1秒一次。2.被TaskSchedulerImpl里用backend.reviveOffers()调用)。触发后调用makeOffers(),a.先过滤出活跃的executor并封装成WorkerOffer(cores,host,execId,..)。b.然后根据resourceOffers按资源和task本地性找出最佳执行策略,返回Seq[TaskDescription]task的描述信息。最后交给SchedulerBackend发送task的描述信息到描述里的executor上执行

//CoarseGrainedSchedulerBackend类
 private def makeOffers(): Unit = {
     val taskDescs = withLock {
        //过滤资源
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
       //整理成workOffers
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort),
              executorData.resourcesInfo.map { case (rName, rInfo) =>
                (rName, rInfo.availableAddrs.toBuffer)
              })
        }.toIndexedSeq
       //找出要在哪些Worker上启动哪些task
        scheduler.resourceOffers(workOffers)
      }
      //对返回的taskDesc发送到对应Executor执行task
      if (taskDescs.nonEmpty) {
        launchTasks(taskDescs)
      }
    }

//TaskScheduler类,查找task最佳资源
//代码非常长,套用方法非常多,只显示核心逻辑,避免文章过长
//提示:伪代码 
遍历排序好的TaskSet,这里其实就是taskManager.
for (taskSet <- sortedTaskSets) {
  再遍历taskSet里拥有的级别,从最优级别开始
  for (currentMaxLocality <- taskSet.myLocalityLevels) {
        var  launchedTask =false
        do {
            //找不到task执行资源为false
            //查找资源
            launchedTask = resourceOfferSingleTaskSet(taskSet,
              currentMaxLocality, shuffledOffers,tasks,...)
          } while (launchedTask)
}

private def resourceOfferSingleTaskSet(....){
    //遍历每个workOffer(资源)
    for (i <- 0 until shuffledOffers.size) {
      //空闲资源大于task执行需要的资源
      if (availableCpus(i) >= CPUS_PER_TASK){
          //resourceOffer方法为去查找最佳task执行位置,返回类型Option[taskDesc]
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
             //各种信息更新
          }
      }
   }
 return launchedTask
}

def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality,
      availableResources: Map[String, Seq[String]] = Map.empty)
    : Option[TaskDescription] =
  {
      //遍历的最优数据本地性不为NO_PREF,计算一个允许的最低本地性级别
      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }
  //dequeueTask查找task的index,返回类型Option[int]
  dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
  //找到的task,进行封装taskDesc,将资源的地址等等信息封装。并返回
return TaskDesc
  }
}

 private def dequeueTask(
      execId: String,
      host: String,
      maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
    //pendingTask
    val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks
    //dequeue方法主要是dequeueTaskFromList从  pendingTask取出task的Index,返回类型option[int]
    def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
      val task = dequeueTaskFromList(execId, host, list, speculative)
      if (speculative && task.isDefined) {
        speculatableTasks -= task.get
      }
      task
    }
  //最先默认从forExecutor根据资源的execId查找task
 dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index =>
      return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
    }
    //比较允许的最低级别大于Node_local级别,通过主机名找到相应的Task
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index =>
        return Some((index, TaskLocality.NODE_LOCAL, speculative))
      }
    }

    //node_local之后,会比较允许的最低级别大于NO_PREF级别,通过noPrefs去pendingTask查找
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
        return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
      }
    }
    //NO_PREF之后,会比较允许的最低级别大于RACK_LOCAL级别,通过rack去forRack pendingTask查找
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer()))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, speculative))
      }
    }
    //最后 去ANY pendingTask查找
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      dequeue(pendingTaskSetToUse.all).foreach { index =>
        return Some((index, TaskLocality.ANY, speculative))
      }
    }
    None
}
task_arrange.png
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
  //首先对每个executor需要执行的task消息序列化一下,可以在网络间进行传输
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit >= maxRpcMessageSize) {
    //根据task消息中的executorId找到运行的executor
      val executorData = executorDataMap(task.executorId)
      //并将executor空余的core数减去自身需要的core数
      executorData.freeCores -= scheduler.CPUS_PER_TASK
       //向executor发送LaunchTask消息,用于在对应executor上启动task
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}
case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        // 将TaskDescription反序列化
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        // 调用executor的launchTask来加载该task
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }
def launchTask(
     context: ExecutorBackend,
     taskId: Long,
     attemptNumber: Int,
     taskName: String,
     serializedTask: ByteBuffer): Unit = {
   // 创建一个TaskRunner
   val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
     serializedTask)
   runningTasks.put(taskId, tr)
   // 将tr放到线程池中执行
   threadPool.execute(tr)
 }
override def runTask(context: TaskContext): MapStatus = {
    // 反序列化出rdd和ShuffleDependency
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    var writer: ShuffleWriter[Any, Any] = null
    try {
      // 获取shuffleManager
      val manager = SparkEnv.get.shuffleManager
      // 通过shuffleManager的getWriter()方法,获得shuffle的writer  
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      // 通过rdd指定分区的迭代器iterator方法来遍历每一条数据,再之上再调用writer方法以写数据
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    } 
  }
override def runTask(context: TaskContext): U = { 
   val deserializeStartTime = System.currentTimeMillis()
   val ser = SparkEnv.get.closureSerializer.newInstance()
   // 反序列化
   val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
     ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
   _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
   // 对rdd的指定分区的迭代器执行func函数,并返回结果
   func(context, rdd.iterator(partition, context))
 }
//MapPartitionRDD
override def compute(split: Partition, context: TaskContext): Iterator[U] =
   f(context, split.index, firstParent[T].iterator(split, context))

看代码:compute函数是调用f函数获取迭代器,f函数是一个匿名函数。在这就是split操作。也就是说获取的迭代器在此做一个split操作,在返回。
然后继续看 firstParent[T].iterator。调用上一个rdd的iterator方法获取迭代器,其实就是跟刚刚一样了。直到获取到第一个HadoopRDD调用compute方法计算当前partition的Iterator.。

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {
       //将compute的输入theSplit,转换为HadoopPartition
      private val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      private val jobConf = getJobConf()
      ...
      //创建reader
      private var reader: RecordReader[K, V] = null
      //先根据conf拿到InputFormat,
      private val inputFormat = getInputFormat(jobConf)
      //从InputFormat中getRecordReader,传入HadoopPartition,conf。
      reader =
        try {
          inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
        } catch {...  }
      ...
      private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
      private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
      //重写next方法,用以遍历数据
      override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {...        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        (key, value)
      }
    //最后构建一个包装好的迭代器,传入根据reader读数,并重写了next方法的迭代器。
    new InterruptibleIterator[(K, V)](context, iter)
  }
//count提交作业,传入Utils.getIteratorSize _ 对没每个分区的计算。
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

//func函数 就是累加迭代器,触发整个数据操作。
def getIteratorSize(iterator: Iterator[_]): Long = {
   var count = 0L
   while (iterator.hasNext) {
     count += 1L
     iterator.next()
   }
   count
 }

总流程:概括


all_operator.png
上一篇下一篇

猜你喜欢

热点阅读