spark生态系统Spark学习之路spark

[spark] Task成功执行的结果处理

2017-10-27  本文已影响111人  BIGUFO

前言

在文章Task执行流程 中介绍了task是怎么被分配到executor上执行的,本文讲解task成功执行时将结果返回给driver的处理流程。

Driver端接收task完成事件

在executor上成功执行完task并拿到serializedResult 之后,通过CoarseGrainedExecutorBackend的statusUpdate方法来返回结果给driver,该方法会使用driverRpcEndpointRef 发送一条包含 serializedResult 的 StatusUpdate 消息给 driver。

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

而在driver端CoarseGrainedSchedulerBackend 在接收到StatusUpdate事件的处理代码如下:

case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

这里我们重点看看在TaskSchedulerImpl里面根据task的状态做了什么样的操作:

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    var failedExecutor: Option[String] = None
    var reason: Option[ExecutorLossReason] = None
    synchronized {
      try {
        // task丢失,则标记对应的executor也丢失,并涉及到一些映射跟新
        if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
          // We lost this entire executor, so remember that it's gone
          val execId = taskIdToExecutorId(tid)

          if (executorIdToTaskCount.contains(execId)) {
            reason = Some(
              SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
            removeExecutor(execId, reason.get)
            failedExecutor = Some(execId)
          }
        }
        //获取task所在的taskSetManager
        taskIdToTaskSetManager.get(tid) match {
          case Some(taskSet) =>
            if (TaskState.isFinished(state)) {
              taskIdToTaskSetManager.remove(tid)
              taskIdToExecutorId.remove(tid).foreach { execId =>
                if (executorIdToTaskCount.contains(execId)) {
                  executorIdToTaskCount(execId) -= 1
                }
              }
            }
            // task成功的处理
            if (state == TaskState.FINISHED) {
              // 将当前task从taskSet中正在执行的task列表中移除
              taskSet.removeRunningTask(tid)
              //成功执行时,在线程池中处理任务的结果
              taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
            //处理失败的情况
            } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
              taskSet.removeRunningTask(tid)
              taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
            }
          case None =>
            logError(
              ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
                "likely the result of receiving duplicate task finished status updates)")
                .format(state, tid))
        }
      } catch {
        case e: Exception => logError("Exception in statusUpdate", e)
      }
    }
    // Update the DAGScheduler without holding a lock on this, since that can deadlock
    if (failedExecutor.isDefined) {
      assert(reason.isDefined)
      dagScheduler.executorLost(failedExecutor.get, reason.get)
      backend.reviveOffers()
    }
  }

task状态为Lost,则标记对应的executor也丢失,并涉及到一些映射跟新和意味着该executor上对应的task的重新分配;还有其他一些状态暂时不做解析。主要看task状态为FINISHED时,通过taskResultGetter的enqueueSuccessfulTask方法将task的的结果处理丢到了线程池中执行:

def enqueueSuccessfulTask(
      taskSetManager: TaskSetManager,
      tid: Long,
      serializedData: ByteBuffer): Unit = {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          // 从serializedData反序列化出result和结果大小
          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
            // 可直接获取的结果
            case directResult: DirectTaskResult[_] =>
              // taskSet的总结果大小超过限制
              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
                return
              } 
              directResult.value()
              // 直接返回结果及大小
              (directResult, serializedData.limit())
            // 可间接的获取执行结果,需借助BlockManager来获取
            case IndirectTaskResult(blockId, size) =>
              // 若大小超多了taskSetManager能抓取的最大限制,则删除远程节点上对应的blockManager 
              if (!taskSetManager.canFetchMoreResults(size)) {
                // dropped by executor if size is larger than maxResultSize
                sparkEnv.blockManager.master.removeBlock(blockId)
                return
              }
              logDebug("Fetching indirect task result for TID %s".format(tid))
              // 标记Task为需要远程抓取的Task并通知DAGScheduler              
              scheduler.handleTaskGettingResult(taskSetManager, tid)
              // 从远程的BlockManager上获取Task计算结果 
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              // 抓取结果失败,结果丢失
              if (!serializedTaskResult.isDefined) {
               // 在Task执行结束获得结果后到driver远程去抓取结果之间,如果运行task的机器挂掉,
               // 或者该机器的BlockManager已经刷新掉了Task执行结果,都会导致远程抓取结果失败。
                scheduler.handleFailedTask(
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                return
              }
              // 抓取结果成功,反序列化结果
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get.toByteBuffer)
                // 删除远程BlockManager对应的结果
               sparkEnv.blockManager.master.removeBlock(blockId)
              // 返回结果
              (deserializedResult, size)
          }
          ...
        // 通知scheduler处理成功Task
        scheduler.handleSuccessfulTask(taskSetManager, tid, result)
        } catch { 
          ...
        }
      }
    })
  }

继续跟进scheduler是如何处理成功的task:

def handleSuccessfulTask(
      taskSetManager: TaskSetManager,
      tid: Long,
      taskResult: DirectTaskResult[_]): Unit = synchronized {
    taskSetManager.handleSuccessfulTask(tid, taskResult)
  }

里面调用了该taskSetManager对成功task的处理方法:

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
    val info = taskInfos(tid)
    val index = info.index
    info.markSuccessful()
    // 从线程池中移除该task
    removeRunningTask(tid)
    // 通知dagScheduler
    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
    // 标记该task成功处理
    if (!successful(index)) {
      tasksSuccessful += 1
      logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
        info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
      // Mark successful and stop if all the tasks have succeeded.
      successful(index) = true
      if (tasksSuccessful == numTasks) {
        isZombie = true
      }
    } else {
      logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
        " because task " + index + " has already completed successfully")
    }
    // 从失败过的task->executor中移除
    failedExecutors.remove(index)
    // 若该taskSet所有task都成功执行
    maybeFinishTaskSet()
  }

逻辑很简单,标记task成功运行、跟新failedExecutors、若taskSet所有task都成功执行的一些处理,我们具体看看是怎么通知dagScheduler的,这里调用了dagScheduler的taskEnded方法:

def taskEnded(
      task: Task[_],
      reason: TaskEndReason,
      result: Any,
      accumUpdates: Seq[AccumulatorV2[_, _]],
      taskInfo: TaskInfo): Unit = {
    eventProcessLoop.post(
      CompletionEvent(task, reason, result, accumUpdates, taskInfo))
  }

这里像DAGScheduler Post了一个CompletionEvent事件,在DAGScheduler#doOnReceive有对应的处理:

// DAGScheduler#doOnReceive
 case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

继续看看 dagScheduler#handleTaskCompletion的实现,代码太长,列出主要逻辑部分:

 private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
    ...
    val stage = stageIdToStage(task.stageId)
    event.reason match {
      case Success =>
        // 从该stage中等待处理的partition列表中移除Task对应的partition 
        stage.pendingPartitions -= task.partitionId
        task match {
          case rt: ResultTask[_, _] =>
            // Cast to ResultStage here because it's part of the ResultTask
            // TODO Refactor this out to a function that accepts a ResultStage
            val resultStage = stage.asInstanceOf[ResultStage]
            resultStage.activeJob match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  updateAccumulators(event)
                  job.finished(rt.outputId) = true
                  job.numFinished += 1
                  // If the whole job has finished, remove it
                  if (job.numFinished == job.numPartitions) {
                    markStageAsFinished(resultStage)
                    cleanupStateForJobAndIndependentStages(job)
                    listenerBus.post(
                      SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                  }

                  // taskSucceeded runs some user code that might throw an exception. Make sure
                  // we are resilient against that.
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)
                  } catch {
                    case e: Exception =>
                      // TODO: Perhaps we want to mark the resultStage as failed?
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
                  }
                }
              case None =>
                logInfo("Ignoring result from " + rt + " because its job has finished")
            }
          // 若是ShuffleMapTask
          case smt: ShuffleMapTask =>
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            updateAccumulators(event)
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            // 忽略在集群中游走的ShuffleMapTask(来自一个失效的节点的Task结果)。
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
            } else {
              // 将结果保存到对应的Stage
              shuffleStage.addOutputLoc(smt.partitionId, status)
            }
            // 若当前stage的所有task已经全部执行完毕
            if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
              markStageAsFinished(shuffleStage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)

              // 将stage的结果注册到MapOutputTrackerMaster
              mapOutputTracker.registerMapOutputs(
                shuffleStage.shuffleDep.shuffleId,
                shuffleStage.outputLocInMapOutputTrackerFormat(),
                changeEpoch = true)
              // 清除本地缓存
              clearCacheLocs()
              // 若stage一些task执行失败没有结果,重新提交stage来调度执行未执行的task
              if (!shuffleStage.isAvailable) {
                // Some tasks had failed; let's resubmit this shuffleStage
                // TODO: Lower-level scheduler should also deal with this
                logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                  ") because some of its tasks had failed: " +
                  shuffleStage.findMissingPartitions().mkString(", "))
                submitStage(shuffleStage)
              } else {
                // 标记所有等待这个Stage结束的Map-Stage Job为结束状态 
                if (shuffleStage.mapStageJobs.nonEmpty) {
                  val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
                  for (job <- shuffleStage.mapStageJobs) {
                    markMapStageJobAsFinished(job, stats)
                  }
                }
              }

              // Note: newly runnable stages will be submitted below when we submit waiting stages
            }
        }
        ...
    }
    submitWaitingStages()
  }

当task为ShuffleMapTask时,该task不是在无效节点的运行的条件下将结果保存到stage中,若当前stage的所有task都运行完毕(不一定成功),则将所有结果注册到MapOutputTrackerMaster(以便下一个stage的task就可以通过它来获取shuffle的结果的元数据信息);然后清空本地缓存;当该stage有task没有成功执行也就没有结果,需要重新提交该stage运行未完成的task;若所有task都成功完成,说明该stage已经完成,则会去标记所有等待这个Stage结束的Map-Stage Job为结束状态。

当task为ResultTask时,增加job完成的task数,若所有task全部完成即job已经完成,则标记该stage完成并从runningStages中移除,在cleanupStateForJobAndIndependentStages方法中,遍历当前job的所有stage,在对应stage没有依赖的job时则直接将此stage移除。然后将当前job从activeJob中移除。

最后调用job.listener.taskSucceeded(rt.outputId, event.result),实际调用的是JobWaiter(JobListener的具体实现)的taskSucceeded方法:

override def taskSucceeded(index: Int, result: Any): Unit = {
    // resultHandler call must be synchronized in case resultHandler itself is not thread safe.
    synchronized {
      resultHandler(index, result.asInstanceOf[T])
    }
    if (finishedTasks.incrementAndGet() == totalTasks) {
      jobPromise.success(())
    }
  }

这里的resultHandler就是在action操作触发runJob的时候规定的一种结果处理器:

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }

这里的(index, res) => results(index) = res 就是resultHandler,也就是将这里的results数组填满再返回,根据不同的action进行不同操作。
若完成的task数和totalTasks数相等,则该job成功执行,打印日志完成。

上一篇下一篇

猜你喜欢

热点阅读