Spark1.3.1Shuffle源码分析

2018-06-17  本文已影响0人  LancerLin_LX
image.png
图片出处:https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md image.png

1.ShuffleMapTasks

image.png image.png image.png image.png

dirver像executor发送LaunchTask


image.png

executor接收到序列化好的task后,反序列化后执行task


image.png

CoarseGrainedExecutorBackend里的executor实际上是Executor

image.png

new TaskRunner来执行task


image.png image.png image.png

所以最后是调用ShuffleMapTask.runTask(),并且通过SparkEnv获取shuffleManager,SparkEnv初始化了很多重要的组件

image.png

shuffleManager获取writer,默认的writer是HashShuffleManager,获取HashShuffleWriter调用write方法

image.png image.png image.png
这里有个调优参数spark.shuffle.consolidateFiles
image.png

val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)

image.png

blockManager.getDiskWriter

image.png image.png

这里可以看出,使用了NIO的文件api


image.png

回到executor的run方法中


image.png image.png

这里分析
env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

image.png image.png image.png image.png

写数据逻辑


image.png

spill到磁盘的逻辑


image.png image.png image.png

复制副本操作


image.png image.png

最后写完数据后,上报给driver的MapStatusTracker

image.png image.png

driver端收到后


image.png image.png image.png image.png image.png image.png image.png image.png

2.ResultTask

image.png image.png image.png

3.ShuffledRDD

image.png image.png image.png

通过HashShuffleReader来读取数据


image.png image.png image.png image.png image.png image.png
上一篇下一篇

猜你喜欢

热点阅读