Spark源码[7]-Task结构

2020-05-24  本文已影响0人  蠟筆小噺没有烦恼

1 Task抽象类

org.apache.spark.scheduler.Task
提供的抽象方法有:

def runTask(context: TaskContext): T
def preferredLocations: Seq[TaskLocation] = Nil

Task中有一个重要的已实现方法Run,会先调用BlockManager.registerTask进行Task注册;并创建Task上下文TaskContext,将TaskContext注册到ThreadLocal中,并获取当前Task的运行线程;
会调用子类的RunTask,进行具体的Task计算。如果有任何错误,则标记为失败。在finally块中会调用MemoryStore的releaseUnrollMemoryForThisTask方法释放堆内和堆外内存,并执行memoryManager.notifyAll()唤醒等待申请内存的线程,进行新一轮的内存申请操作。
Task实现类很多,但主要需要掌握的实现类有ShuffleMapTask和ResultTask。

2 ShuffleMapTask

相当于MapReduce的Map,负责计算,并将数据根据partition函数分配到不同分区,等待下游拉取。

2.1 RunTask方法

首先通过反序列化依赖分析阶段得到的rdd和当前函数,注意最后一行调用的shuffleWriterProcessor.write,实际的调用链总结为:

ShuffleWriteProcessor.write->ShuffleManager特质.getWriter->SortShuffleManager.getWriter

当前版本的实现ShuffleManager抽象类的Writer部分只有一个子类,就是SortShuffleManager,内部的调用getWriter会根据partition数量等判断使用哪一种shuffle方式:[UnsafeShuffleWriter,BypassMergeSortShuffleWriter,SortShuffleWriter]

3 ResultTask

类似MapReduce中的Reduce部分,读取上游MapTask输出的数据,并计算得到最终结果。

3.1 RunTask方法

首先通过反序列化依赖分析阶段得到的rdd和当前函数,通过rdd.iterator方法,得到当前数据的迭代器。每个不同的rdd都会有不同的迭代器方法,通过下一节的迭代计算进行介绍。

4 迭代计算

位于org.apache.spark.rdd.RDD.scala


如果存储级别不为None,说明之前一定进行了存储,可能是磁盘/内存/堆外内存三种情况,则调用getOrCompute从存储尝试恢复。
如果存储级别为None,那么说明是初次执行,且没有进行执行结果,则调用computeOrReadCheckpoint进行计算,或者从检查点恢复(可能是由于计算中断)。

4.1 getOrCompute


先调用BlockManager的getOrElseUpdate尝试从存储体系获取RDD分区的Block,否则调用computeOrRead
Chckpoint方法。
对getOrElseUpdate中获取的结果匹配,将BlockResult的data属性或者返回的Iterator封装为InterruptibleIterator。

4.2 computeOrReadCheckpoint


线尝试从检查点获取数据,否则执行compute方法进行实际的计算。对于每一种实现了RDD抽象类的实现类,都会有不同的compute方法实现,比如:

ShuffleRDD的compute方法:


会通过ShuffleManager的getReader方法,而实现了getReader方法的只有BlockStoreShuffleReader类,之后调用其read方法,通过从上游Shuffle的结果进行数据读取,生成迭代器,迭代器最终被ResultTask进行调用,通过用户传入的方法进行迭代器的数据处理。

上一篇下一篇

猜你喜欢

热点阅读