Spark1.3.1Shuffle源码分析
2018-06-17 本文已影响0人
LancerLin_LX

图片出处:https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md

1.ShuffleMapTasks




dirver像executor发送LaunchTask

executor接收到序列化好的task后,反序列化后执行task

CoarseGrainedExecutorBackend
里的executor实际上是Executor
类

new TaskRunner来执行task



所以最后是调用ShuffleMapTask.runTask()
,并且通过SparkEnv
获取shuffleManager,SparkEnv初始化了很多重要的组件

shuffleManager获取writer,默认的writer是HashShuffleManager
,获取HashShuffleWriter
调用write方法



这里有个调优参数
spark.shuffle.consolidateFiles

val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)

blockManager.getDiskWriter


这里可以看出,使用了NIO的文件api

回到executor的run方法中


这里分析
env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)




写数据逻辑

spill到磁盘的逻辑



复制副本操作


最后写完数据后,上报给driver的MapStatusTracker


driver端收到后








2.ResultTask



3.ShuffledRDD



通过HashShuffleReader来读取数据





