Spark内核分析之DAGScheduler划分算法实现原理讲解
接着上一篇,我们接着来分析下一个非常重要的组建DAGScheduler的运行原理是怎么实现的;通过之前对Spark的分析讲解,我们的Spark作业是在遇到一个action算子以后并以此为界限,划分出一个Job出来,也就是在这个时候,Spark作业向集群提交一个Job任务;下面我们看看源码是如何实现的;
spark Job提交源码图通过在任何一个action操作的算子中追踪发现,最终提交一个Job是调用了SparkContext的runJob方法实现的,在该方法中通过dagSchedualer.runJob()正式向集群提交一个Job任务,接下来重点来了,我们来看看DAGScheduler是如何对一个Job进行stage划分的;
提交任务这里通过eventProcessLoop对象将Job进行提交,下面我们看看在eventProcessLoop中具体发生了什么;
1.首先,创建出与partition数量相等的task;
2.由触发Job提交的那个RDD算子作为作为起点,创建第一个stage并命名为finalStage;
3.对于if条件成立的内容,是针对于本地模式运行的,我们主要来分析一下集群模式下的工作模式,在else逻辑中,我们可以看到调用了submitStage的方法,该方法就是实现stage划分的重要实现;
stage划分算法实现1.在该方法中我们调用了getMissingParentStages()方法,并将其RDD压入一个栈中;
2.在这个方法中,首先弹栈获得栈顶的RDD,并使用循环反复调用当前RDD所依赖的父RDD,并判断其父RDD是宽依赖还是窄依赖;
3.如果是宽依赖,则创建一个新的stage,并将其加入到missingStage缓存中;如果是窄依赖的话,则将当前的RDD在压入栈中;
4.如此往复,直到一个stage遍历完成;
5.运行完以上动作之后,接着使用递归操作,重复调用submitStage()方法,直到没有父Stage的时候,即方法返回结果为Nil的时候,开始调用submitMissingTask将一个stage(即一个Taskset)提交给TaskScheduler去;
总结:至此,我们的DAGScheduler的stage划分算法基本上就介绍完了,下篇文章我们来接着介绍当一个Taskset被提交给TaskScheduler后,TaskScheduler是如何对一个Taskset集合中的每个Task进行合理分配的,即我们的Task分配算法是如何实现的,欢迎关注。
如需转载,请注明: