Spark1.3.1Shuffle源码分析
2018-06-17 本文已影响0人
LancerLin_LX
image.png
图片出处:https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md
image.png
1.ShuffleMapTasks
image.png
image.png
image.png
image.png
dirver像executor发送LaunchTask
image.png
executor接收到序列化好的task后,反序列化后执行task
image.png
CoarseGrainedExecutorBackend里的executor实际上是Executor类
image.png
new TaskRunner来执行task
image.png
image.png
image.png
所以最后是调用ShuffleMapTask.runTask(),并且通过SparkEnv获取shuffleManager,SparkEnv初始化了很多重要的组件
image.png
shuffleManager获取writer,默认的writer是HashShuffleManager,获取HashShuffleWriter调用write方法
image.png
image.png
image.png
这里有个调优参数
spark.shuffle.consolidateFiles
image.png
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
image.png
blockManager.getDiskWriter
image.png
image.png
这里可以看出,使用了NIO的文件api
image.png
回到executor的run方法中
image.png
image.png
这里分析
env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
image.png
image.png
image.png
image.png
写数据逻辑
image.png
spill到磁盘的逻辑
image.png
image.png
image.png
复制副本操作
image.png
image.png
最后写完数据后,上报给driver的MapStatusTracker
image.png
image.png
driver端收到后
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
2.ResultTask
image.png
image.png
image.png
3.ShuffledRDD
image.png
image.png
image.png
通过HashShuffleReader来读取数据
image.png
image.png
image.png
image.png
image.png
image.png