spark任务调度详解
一、spark scheduler(spark任务调度)
-
在使用spark-summit或者spark-shell提交spark程序后,根据提交时指定(deploy-mode)的位置,创建driver进程,driver进程根据sparkconf中的配置,初始化sparkcontext。Sparkcontext的启动后,创建DAG Scheduler(将DAG图分解成stage)和Task Scheduler(提交和监控)两个调度task模块。
-
driver进程根据配置参数向resourcemanager(资源管理器)申请资源(主要是用来执行的executor),resourcemanager接到到了Application的注册请求之后,会使用自己的资源调度算法,在spark集群的nodemanager上,通知nodemanager为application启动多个Executor。(以上是以yarn为平台的描述,如果是以standalone为平台,resourcemanager一般称之为master,nodemanager称之为worker)
-
executor创建后,会向resourcemanager进行资源及状态反馈,以便resourcemanager对executor进行状态监控,如监控到有失败的executor,则会立即重新创建。
-
Executor会向taskScheduler反向注册,以便获取taskScheduler分配的task。
-
Driver完成SparkContext初始化,继续执行application程序,当执行到Action时,就会创建Job。并且由DAGScheduler将Job划分多个Stage,每个Stage 由TaskSet 组成,并将TaskSet提交给taskScheduler,taskScheduler把TaskSet中的task依次提交给Executor, Executor在接收到task之后,会使用taskRunner(封装task的线程池)来封装task,然后,从Executor的线程池中取出一个线程来执行task。
就这样Spark的每个Stage被作为TaskSet提交给Executor执行,每个Task对应一个RDD的partition,执行我们的定义的算子和函数。直到所有操作执行完为止。如下图所示:
spark任务调度.png二.、Spark作业调度中stage划分
Spark在接收到提交的作业后,DAGScheduler会根据RDD之间的依赖关系将作业划分成多个stage,DAGSchedule在将划分的stage提交给TASKSchedule,TASKSchedule将每个stage分成多个task,交给executor执行。task的个数等于stage末端的RDD的分区个数(每个stage的分区数,可能不一致)。因此对了解stage的划分尤为重要。
在spark中,RDD之间的依赖关系有两种:一种是窄依赖,一种是宽依赖。窄依赖的描述是:父RDD的分区最多只会被子RDD的一个分区使用。宽依赖是:父RDD的一个分区会被子RDD的多个分区使用。如下图所示:
stage划分.png上图中,以一竖线作为分界,左边是窄依赖,右边是宽依赖。
Stage的划分不仅根据RDD的依赖关系,还有一个原则是将依赖链断开,每个stage内部可以并行运行,整个作业按照stage顺序依次执行,最终完成整个Job。
实际划分时,DAGScheduler就是根据DAG图,从图的末端逆向遍历整个依赖链,一般是以一次shuffle为边界来划分的。一般划分stage是从程序执行流程的最后往前划分,遇到宽依赖就断开,遇到窄依赖就将将其加入当前stage中。一个典型的RDD Graph如下图所示:其中实线框是RDD,RDD内的实心矩形是各个分区,实线箭头表示父子分区间依赖关系,虚线框表示stage。针对下图流程首先根据最后一步join(宽依赖)操作来作为划分stage的边界,再往左走,A和B之间有个group by也为宽依赖,也可作为stage划分的边界,所以我们将下图划分为三个stage。
image.png三、job、stage、task的关系
Job、stage和task是spark任务执行流程中的三个基本单位。其中job是最大的单位,Job是spark应用的action算子催生的;stage是由job拆分,在单个job内是根据shuffle算子来拆分stage的,单个stage内部可根据操作数据的分区数划分成多个task。
job、stage、task的关系.png四、以算子的角度考虑RDD的创建和执行:
image.png-
首先针对一段应用代码,driver会以action算子为边界生成响应的DAG图
-
DAG Scheduler从DAG图的末端开始,以图中的shuffle算子为边界来划分stage,stage划分完成后,将每个stage划分为多个task,DAG Scheduler将taskSet传给Task Scheduler来调用
-
Task Scheduler根据一定的调度算法,将接收到的task池中的task分给work node节点中的executor执行
这里我们看到RDD的执行流程中,DAG Scheduler和Task Scheduler起到非常关键的作用,因此下面我们来看下DAG Scheduer和Task Scheduler的工作流程。
4.1、DAG Scheduler工作流程
DAG Scheduler是一个高级的scheduler 层,他实现了基于stage的调度,他为每一个job划分stage,并将单个stage分成多个task,然后他会将stage作为taskSet提交给底层的Task Scheduler,由Task Scheduler执行。
DAG的工作原理如下图:
DAG的工作原理.png针对左边的一段代码,DAG Scheduler根据collect(action算子)将其划分到一个job中,在此job内部,划分stage,如上右图所示。DAG Scheduler在DAG图中从末端开始查找shuffle算子,上图中将reduceByKey为stage的分界,shuffle算子只有一个,因此分成两个stage。前一个stage中,RDD在map完成以后执行shuffle write将结果写到内存或磁盘上,后一个stage首先执行shuffle read读取数据在执行reduceByKey,即shuffle操作。
4.2、TASK Scheduler工作流程
Task Scheduler是sparkContext中除了DAG Scheduler的另一个非常重要的调度器,task Scheduler负责将DAG Scheduer产生的task调度到executor中执行。如下图所示,Task Scheduler 负责将TaskSetPool中的task调度到executor中执行,一般的调度模式是FIFO(先进先出),也可以按照FAIR(公平调度)的调度模式,具体根据配置而定。其中FIFO:顾名思义是先进先出队列的调度模式,而FAIR则是根据权重来判断,权重可以根据资源的占用率来分,如可设占用较少资源的task的权重较高。这样就可以在资源较少时,调用后来的权重较高的task先执行了。至于每个executor中同时执行的task数则是由分配给每个executor中cpu的核数决定的。
TASK Scheduler工作流程.png