1.3 Spark Architecture Executor

2018-11-13  本文已影响0人  GongMeng

1.整体结构

Executor是在worker上的执行单元, 是一个独立的JVM哦!
在启动时, Executor也需要维护一个本地的SparkEnv. 这里给出和Driver一样的结构图. 内容不再累述


sparkEnv

Executor是在任务执行的时候, 由Driver向集群资源管理Master要资源, 资源管理中心将Woker(物理机)上的资源进行分配. Driver告知对应的节点启动Executor.

Executor是由对应的集群管理器的ExecutorBackend进程创建的

心跳逻辑

Executor使用向Driver发送心跳来确定自己进入Driver的管理列表


任务执行逻辑

Driver监听到心跳后, SparkListener监听器告诉TaskScheduler又一个搬砖的到位了. TaskScheduler会分配具体的任务给Executor, Executor通过本地的TaskRunner(这是一个进程) 来执行对应的任务, 并把结果向上级报告.

2. Executor实例的创建

private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {
}
  // Application dependencies (added through SparkContext) that we've fetched so far on this node.
  // Each map holds the master's timestamp for the version of that file or JAR we got.
  private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
  private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()

  // Start worker thread pool
  // 每个Executor可以同时执行多个Task, 所以用一个ThreadPool来管理, TaskRunner是实际的执行者, 是一个实现了Runnable接口的内部类
  private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
  private val executorSource = new ExecutorSource(threadPool, executorId)

  // Akka's message frame size. If task result is bigger than this, we use the block manager
  // to send the result back.
  // 这个默认大小是128MB
  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

  // Limit of bytes for total size of results (default is 1GB)
  private val maxResultSize = Utils.getMaxResultSize(conf)

  // Maintains the list of running tasks.
  private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

  // Executor for the heartbeat task.
  // 这里要注意到心跳是一个单例, 避免单个Executor重复发送心跳包的问题
  private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

当创建成果时, 可以看到下面这个打印日志

INFO Executor: Starting executor ID [executorId] on host [executorHostname]

3. 挂载任务

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

可以看到Executor把接收到的序列化的任务, 交付给一个TaskRunner来工作, TaskRunner是一个内部类, 负责干活. Executor可以同时又多个TaskRunner干活, 所以用了一个线程池来管理.

4. TaskRunner

TaskRunner

4.1 TaskRunner概要

TaskRunner 实现Runnable接口, 它是一个进程, 所以可以启动, 被杀死, 被打断, 被加入到各种Pool里面去, 抛出哪些进程应该抛出的错误. 加那些进程应该加的锁, 包含那些进程应该包含的BUG.

4.2 TaskRunne比较重要的几个内部器官

   class TaskRunner(
      execBackend: ExecutorBackend,
      val taskId: Long,
      val attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer)
    extends Runnable {

    /** Whether this task has been killed. */
    @volatile private var killed = false

    /** How much the JVM process has spent in GC when the task starts to run. */
    @volatile var startGCTime: Long = _

    /**
     * The task to run. This will be set in run() by deserializing the task binary coming
     * from the driver. Once it is set, it will never be changed.
     */
    @volatile var task: Task[Any] = _

4.3 TaskRunner在运行中干了啥

截取部分源码进行注释

override def run(): Unit = {
      // 初始化一个内存管理
      val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
 
      // 开启ClassLoader
      Thread.currentThread.setContextClassLoader(replClassLoader)
      //  执行到这里的时候, 可以在日志中看到一个Task日志的开始
      logInfo(s"Running $taskName (TID $taskId)")
      //  向execBackend汇报当前的状态  
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

      // 统计垃圾回收的时间
      startGCTime = computeTotalGcTime()
     // 统计序列化的耗时
      val deserializeStartTime = System.currentTimeMillis()
      try {
        // 把Driver端发过来的压缩后的图纸变成内存里看的懂得图纸, 开始搬砖吧
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)

        // 这里调用了一个Executor的Method, 从Driver端的HTTP服务器那里下之地第三方包
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
        task.setTaskMemoryManager(taskMemoryManager)

        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        if (killed) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw new TaskKilledException
        }

        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)

        // Run the actual task and measure its runtime.
        taskStart = System.currentTimeMillis()

        var threwException = true
        val (value, accumUpdates) = try {

          // 实际执行任务, 执行过程中需要申请Memroy资源和Block资源
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        } finally {
           //  执行完毕后释放刚才执行中用到的各种锁
          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

          // 清理不干净, 就会报一个Warning提示内存泄漏, 这是Debug中一个点
          if (freedMemory > 0) {
            val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
              throw new SparkException(errMsg)
            } else {
              logError(errMsg)
            }
          }
          //  锁释放不干净, 也会报错
          if (releasedLocks.nonEmpty) {
            val errMsg =
              s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
              releasedLocks.mkString("[", ", ", "]")
            if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
              throw new SparkException(errMsg)
            } else {
              logError(errMsg)
            }
          }
        }
        val taskFinish = System.currentTimeMillis()

        // If the task has been killed, let's fail it.
        if (task.killed) {
          throw new TaskKilledException
        }

        // 对执行结果进行序列化
        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()

        for (m <- task.metrics) {
          /** 各种统计信息的整理
               这些统计信息跟着心跳发送到Driver端, 并展示在WebUI上
            */
          // 注意, 这里会对全局Accumulator变量进行update
          m.updateAccumulators()
        }

      

        // directSend = sending directly back to the driver
        val serializedResult: ByteBuffer = {
           /** 对结果进行序列化
              128MB以下通过Akka进行传输
              超过了就通过BlockManager进行传输
              这里会打印多个日志, 表述结果是多大, 有多少result发给了Driver
         */
        }

        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

      } catch {
          /** 各种错误处理 */
      } finally {
        runningTasks.remove(taskId)
      }
    }
  }

4.4 统计信息包括

ExecutorSource

另外一个比较重要的概念, 就是这个搬砖工有多大的力气, 这些数值都统计在了这个对象里


ExecutorSource Jconsole
Gauge Description
threadpool.activeTasks 可用线程数
threadpool.completeTasks 已经完成的task数
threadpool.currentPool_size 线程池容量
threadpool.maxPool_size 最大容量
filesystem.hdfs.read_bytes HDFS FileSystem.getAllStatistics()getBytesRead()
filesystem.hdfs.write_bytes HDFS FileSystem.getAllStatistics()getBytesWritten()
filesystem.hdfs.read_ops HDFS FileSystem.getAllStatistics()getReadOps()
filesystem.hdfs.largeRead_ops HDFS FileSystem.getAllStatistics()getLargeReadOps().
filesystem.hdfs.write_ops HDFS FileSystem.getAllStatistics()getWriteOps().
上一篇 下一篇

猜你喜欢

热点阅读