Spark1.3.1 Job提交流程分析
wordcount代码
sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.saveAsTextFile(args(1))
首先我们知道,RDD是遇到action操作才开始提交job任务,其他操作是transformation,lazy的模式,
RDD是什么
- A list of partitions(paritition的数据集合)
- A function for computing each split (一个函数作用于每个分片)
- A list of dependencies on other RDDs (RDD之间有依赖关系)
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) ()
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (移动计算,不移动数据)
源码分析
红框里面调用的参数,如果写过MapReduce的话,会非常熟悉,所以我们可以知道,textFile中调用的api,实际就是hadoop里面的默认读取文件的类,该类就是TextInputFormat
![](https://img.haomeiwen.com/i2989357/c37a9ffe72416f2f.png)
![](https://img.haomeiwen.com/i2989357/0fb588165102176f.png)
![](https://img.haomeiwen.com/i2989357/83cdecb0cec7b578.png)
创建了一个HadoopRDD,RDD里,getPartitions是决定多少个partitions,决定多少个task执行的,所以我们看HadoopRDD中的getPartitions方法
![](https://img.haomeiwen.com/i2989357/621eef012b65f8c7.png)
![](https://img.haomeiwen.com/i2989357/6e059ae74a20bf7c.png)
![](https://img.haomeiwen.com/i2989357/2316a3156bd3b453.png)
TextInputFormat继承于FileInputFormat,所以查看inputFormat,查看源码我们可以知道,textFile算子,得到的task数量,就是hadoop中使用MapReduce的map的输入数量,而这个数量并不是我们通常知道的,块的整数倍,而是块的1.1倍
![](https://img.haomeiwen.com/i2989357/e27a6510b8fed36b.png)
![](https://img.haomeiwen.com/i2989357/61c4936917ccbc3d.png)
![](https://img.haomeiwen.com/i2989357/485c26a5abd8166a.png)
RDD中的getPartitions我们可以知道,计算的时候,输入的partitions数量,
compute
方法可以知道,每一个partition是怎么获取数据,首先查看compute方法,可以知道他是怎么获取数据的
![](https://img.haomeiwen.com/i2989357/b2ba37faa515dc15.png)
![](https://img.haomeiwen.com/i2989357/2288b1423e95cdac.png)
getPreferredLocations方法可以知道如何选取最优计算数据。
![](https://img.haomeiwen.com/i2989357/390fb4e06c1200aa.png)
上面的分析可以知道,textFile中首先调用的是hadoop的TextInputFormat类去hdfs读取数据,获取key,value,然后使用map,只保存了value,key丢弃,map实际上是创建了一个MapPartitionsRDD+map函数=MapRDD,MapRDD重写了getPartitions,compute方法,firstParent很重要,
![](https://img.haomeiwen.com/i2989357/0f7370041a00e734.png)
![](https://img.haomeiwen.com/i2989357/3430c8496b881e30.png)
![](https://img.haomeiwen.com/i2989357/dde53f30ecbe053d.png)
hadoopRdd将自己当初this,进入构造方法,就是MapRDD,new OneToOneDependency(HadoopRDD)形成父子依赖关系,MapRDD依赖于hadoopRDD
![](https://img.haomeiwen.com/i2989357/d3410393b57ccc1e.png)
![](https://img.haomeiwen.com/i2989357/760050ff7c67cfc0.png)
![](https://img.haomeiwen.com/i2989357/2b9d820acb74c348.png)
总结,TextFile方法,里面产生了两个RDD,一个是HadoopRDD,一个是MapRDD,HadoopRDD跟MapRDD又形成父子依赖关系
接下来调用flatMap,MapPartitionsRDD + fflatMap函数=FlatMapRDD,同理分析,MapRDD与FlatMapRDD形成父子依赖关系
![](https://img.haomeiwen.com/i2989357/1c85a340cd358c9e.png)
![](https://img.haomeiwen.com/i2989357/255659dd1c61b182.png)
接下来调用map,MapPartitionsRDD + map函数=MapRDD,同理分析,FlatMapRDD与MapRDD形成父子依赖关系
所以,
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1))
形成的依赖关系,以及数据流图![](https://img.haomeiwen.com/i2989357/4a869f602b6eaacb.png)
![](https://img.haomeiwen.com/i2989357/807c08cb29be5df3.png)
接下来分析reduceByKey
,RDD里面没有reducebykey,这里使用了隐式转换,将RDD=>PairRDDFunctions,ShuffledRDD
没看懂,但是可以知道,这里发生了shuff操作
![](https://img.haomeiwen.com/i2989357/23381271991e1f0e.png)
![](https://img.haomeiwen.com/i2989357/82e46c87bd12b0a0.png)
![](https://img.haomeiwen.com/i2989357/b8d8f8b0e62c8738.png)
![](https://img.haomeiwen.com/i2989357/b4b6b4e78136df25.png)
最后,saveAsTextFile
,saveAsTextFile
不仅仅有rdd,还有Action的功能,spark中,action是需要提交job的
![](https://img.haomeiwen.com/i2989357/c075a10ff50c30c0.png)
![](https://img.haomeiwen.com/i2989357/c4c0a2b79e465963.png)
![](https://img.haomeiwen.com/i2989357/e5d250990a584564.png)
![](https://img.haomeiwen.com/i2989357/452ddb5e2090d886.png)
总结,到此,rdd的依赖关系与数据流图为
![](https://img.haomeiwen.com/i2989357/0309b1893ac9d7f5.png)
![](https://img.haomeiwen.com/i2989357/996b4d09abde65f7.png)
接下来分析job提交
![](https://img.haomeiwen.com/i2989357/e384b9d9ab7001ab.png)
![](https://img.haomeiwen.com/i2989357/38bdd6f0f37c617f.png)
![](https://img.haomeiwen.com/i2989357/f24f6e8a94347f52.png)
![](https://img.haomeiwen.com/i2989357/7300292c84de31e6.png)
![](https://img.haomeiwen.com/i2989357/323839584fdba4f0.png)
![](https://img.haomeiwen.com/i2989357/35e2397c37efe76d.png)
![](https://img.haomeiwen.com/i2989357/c989c4ceb63e877c.png)
![](https://img.haomeiwen.com/i2989357/bee71e758df0969c.png)
![](https://img.haomeiwen.com/i2989357/7d252fe18259c6c1.png)
递归的方式找到第一个stage然后提交
submitMissingTasks(stage, jobId.get)
![](https://img.haomeiwen.com/i2989357/81023ea2332531ab.png)
![](https://img.haomeiwen.com/i2989357/768e52d24c025e16.png)
参考:
http://blog.csdn.net/firstblood1/article/details/53444048
http://mzorro.me/2015/08/11/spark-wordcount-analyse/
http://tieba.baidu.com/p/4491480910?see_lz=1
陶思源大人
https://spark-internals.books.yourtion.com/markdown/1-Overview.html
http://guozhongxin.com/pages/2015/01/25/spark_dagscheduler.html
http://blog.csdn.net/oopsoom/article/details/38763985