spark源码阅读之executor模块①

2019-01-21  本文已影响0人  invincine

本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。

SparkContext:Spark应用的入口

SparkContext是用户应用于Spark集群交互的主要接口,所以把SparkContext作为入口来展开executor的源码阅读,主要针对standaone模式下的executor模块。

SparkContext通过调用createTaskScheduler()方法来创建两个重要的类:TaskScheduler和SchedulerBackend

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    // 在DAGScheduler的构造中持有TaskScheduler的引用之后,开始TaskScheduler
    _taskScheduler.start()

两个重要的类

TaskScheduler类:低级的task调度接口,仅有一个实现类为:TaskSchedulerImpl,这个类的作用是为高级task调度接口DAGScheduler划分好的stage分配TaskSet,然后提交给Spark集群,处理Task的运行消息,并将event返回给DAGScheduler,这里可以看出DAGScheduler实例化后持有了TaskSchedulerImpl的引用,有关DAGScheduler与TaskSchedulerImpl配合的调度机制,在后面的文章中展开。

SchedulerBackend类:调度的后台接口,实现类有很多,根据传入的master url采用模式匹配的方式来确定需要什么实现类,主要的作用是当有新的task或者资源变动时找到合适的executor来分配资源,或者是处理从TaskSchedulerImpl发出杀掉Task请求。

在standalone模式中,SchedulerBackend的具体实现类为:SparkDeploySchedulerBackend,通过以下createTaskScheduler()方法中的截选代码可以了解这个过程:

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

scheduler.initialize(backend)表明了TaskSchedulerImpl持有backend的引用,且在这个方法里初始化了用于FIFO和FAIR两种调度模式的容器和池,这部分放到调度模块展开。

至此为止,两个重要的类的实例已经构造完毕:TaskSchedulerImpl和SparkDeploySchedulerBackend

driverEndpoint和appClient的初始化

紧接着,调用了TaskSchedulerImpl的start()方法,在start()方法中首先调用了backend的start()方法

override def start() {
    backend.start()   //调用SchedulerBackend的start()方法

    // 如果开启了推测执行功能的话,就开启一条speculation线程来计算,参数是通过配置文件的参数来传入,或者使用默认值
    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

SparkDeploySchedulerBackend的start()方法首先调用了super的start()方法,这里需要说明的是SparkDeploySchedulerBackend并不是直接继承自SchedulerBackend,而是继承自CoarseGrainedSchedulerBackend,CoarseGrainedSchedulerBackend继承自SchedulerBackend
这样的话,最后其实调用的是CoarseGrainedSchedulerBackend的start()方法,代码如下:

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }

start方法中注册了DriverEndpoint,调用createDriverEndpoint方法创建了一个DriverEndpoint的实例,至此DriverEndpoint创建完成,DriverEndpoint在实例化的过程中,会去调用生命周期中onstart方法,在onStart方法中会周期性的执行以下代码:Option(self).foreach(_.send(ReviveOffers))
即自己给自己发送ReviveOffers的消息,收到ReviveOffers消息后会调用makeOffers方法选出合适executor然后分配资源。

SparkDeploySchedulerBackend在start方法中,还创建了AppClient实例:

    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

AppClient实例封装了关于application的一些信息ApplicationDescription,如appName,maxCores,executorMemory等
client.start()方法中注册了AppClient中的通信端ClientEndpoint

def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    // 注册appClient的rpcEndpoint
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

在注册ClientEndpoint的过程中,调用其生命周期中的onstart方法

override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

至此,DriverEndpoint和AppClient都已经实例化完成
DriverEndpoint已经准备好了,一旦有新的application提交或是集群的资源发生了变化,即调用makeoffers方法去分配资源;
AppClient在注册ClientEndpoint的过程中,将要调用registerWithMaster将application注册请求提交给Master。

registerWithMaster之后的剖析将会放在下一篇文章里继续深入。

上一篇 下一篇

猜你喜欢

热点阅读