Spark源码之连接簇SparkContext

2018-10-15  本文已影响0人  机器不能学习

任务提交之后,代码会依次执行,因为懒加载的缘故,算子都不会立即执行,直到遇到action动作。我们都知道遇到动作后,DAGScheder会根据无线图分解stage,TaskScheder会申请并运行任务。但是在此之前,需要一个连接来配置启动环境,来启动这些类。这就是我要说的SparkContext。

在源码中对它有这样的描述

Main entry point for Spark functionality. A SparkContext represents the connection to a Spark

* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

* Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before

* creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.

意思是它是spark功能的主要入口,是和spark连接的簇,可以用于创建RDD。一个虚拟机智能创建一个sparkContext。

配置环境变量:

参照http://www.cnblogs.com/chushiyaoyue/p/7472904.html

在SparkContext中就创建了SparkEnv。它需要的参数主要是SparkConf:spark的配置项,运行模式,和监听体。它会依次创建安全管理器,分布式消息系统,输出跟踪器,创建shuffler管理器和内存管理器,创建block的管理器和传输器。

更具体介绍见https://yq.aliyun.com/articles/5848


启动类:

这是创建各个类的一段代码

//通过createTaskScheduler创建_taskScheduler,_schedulerBackend

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

_schedulerBackend = sched

_taskScheduler = ts

//创建DAGScheduler

_dagScheduler =new DAGScheduler(this)

_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's

// 运行_taskScheduler

_taskScheduler.start()

//该方法是通过master的属性来创建不同的_taskScheduler和backend。TaskSchedulerImpl对象实例化时会调用其initialize函数,该函数创建资源配置池和资源调度算法,同时通过SchdulableBuilder.addTaskSetmanager:SchdulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager来确定每个Task具体运行在哪个ExecutorBackend中。

```

private def createTaskScheduler(

sc: SparkContext,

master: String,

deployMode: String): (SchedulerBackend, TaskScheduler) = {

import SparkMasterRegex._

// When running locally, don't try to re-execute tasks on failure.

  val MAX_LOCAL_TASK_FAILURES =1

  mastermatch {

case "local" =>

val scheduler =new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal =true)

val backend =new LocalSchedulerBackend(sc.getConf, scheduler,1)

scheduler.initialize(backend)

(backend, scheduler)

case LOCAL_N_REGEX(threads) =>

def localCpuCount: Int = Runtime.getRuntime.availableProcessors()

// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.

      val threadCount =if (threads =="*") localCpuCountelse threads.toInt

if (threadCount <=0) {

throw new SparkException(s"Asked to run locally with $threadCount threads")

}

val scheduler =new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal =true)

val backend =new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)

scheduler.initialize(backend)

(backend, scheduler)

case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>

def localCpuCount: Int = Runtime.getRuntime.availableProcessors()

// local[*, M] means the number of cores on the computer with M failures

// local[N, M] means exactly N threads with M failures

      val threadCount =if (threads =="*") localCpuCountelse threads.toInt

val scheduler =new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal =true)

val backend =new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)

scheduler.initialize(backend)

(backend, scheduler)

case SPARK_REGEX(sparkUrl) =>

val scheduler =new TaskSchedulerImpl(sc)

val masterUrls = sparkUrl.split(",").map("spark://" + _)

val backend =new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

scheduler.initialize(backend)

(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>

// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.

      val memoryPerSlaveInt = memoryPerSlave.toInt

if (sc.executorMemory > memoryPerSlaveInt) {

throw new SparkException(

"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(

memoryPerSlaveInt, sc.executorMemory))

}

val scheduler =new TaskSchedulerImpl(sc)

val localCluster =new LocalSparkCluster(

numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)

val masterUrls = localCluster.start()

val backend =new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

scheduler.initialize(backend)

backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {

localCluster.stop()

}

(backend, scheduler)

case masterUrl =>

val cm =getClusterManager(masterUrl)match {

case Some(clusterMgr) => clusterMgr

case None =>throw new SparkException("Could not parse Master URL: '" + master +"'")

}

try {

val scheduler = cm.createTaskScheduler(sc, masterUrl)

val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)

cm.initialize(scheduler, backend)

(backend, scheduler)

}catch {

case se: SparkException =>throw se

case NonFatal(e) =>

throw new SparkException("External scheduler cannot be instantiated", e)

}

}

}

```

```

//随后_taskScheduler.start()启动TaskSchedulerImpl中的start()

override def start() {

//首先启动的是backend,在这个类CoarseGrainedSchedulerBackend里面

backend.start()

if (!isLocal &&conf.getBoolean("spark.speculation",false)) {

logInfo("Starting speculative execution thread")

speculationScheduler.scheduleWithFixedDelay(new Runnable {

override def run(): Unit = Utils.tryOrStopSparkContext(sc) {

checkSpeculatableTasks()

}

},SPECULATION_INTERVAL_MS,SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)

}

}```

//在CoarseGrainedSchedulerBackend中,

Spark的RPC的消息工作机制会调用生命周期方法onStart方法,在该方法执行时会执行Option(self).foreach(_.send(ReviveOffers))来周期性地发ReviveOffers消息给自己,ReviveOffers是个空的object,会触发makeOffers来‘Make fake resource offers on all executors’。

开始创建的时候是发送的空的,这是在等待执行具体的task的时候用的。

```

override def onStart() {

  val reviveIntervalMs =conf.getTimeAsMs("spark.scheduler.revive.interval","1s")

reviveThread.scheduleAtFixedRate(new Runnable {

override def run(): Unit = Utils.tryLogNonFatalError {

Option(self).foreach(_.send(ReviveOffers))

}

},0, reviveIntervalMs, TimeUnit.MILLISECONDS)

}```

上一篇下一篇

猜你喜欢

热点阅读