Spark Core源码精读计划#18:与RDD的重逢
目录
前言
在前面的17篇文章中,我们对以SparkContext和SparkEnv为中心展开的Spark Core底层支撑组件有了比较深入的理解,当然有一些重要的组件,会随着整个系列的进行详细讲解到。按照计划,我们本应开始看Spark的存储系统结构,但是不着急,我们先花2~3篇文章的时间来重新认识一下我们的老朋友——RDD。它不仅与存储息息相关,也是Spark任务调度和计算的主要对象,现在打好基础是非常有益的。
RDD的正式名称为弹性分布式数据集(Resilient Distributed Dataset),Spark官方文档中对它的定义是:可以并行操作的、容错的元素集合。实际上,除了可并行操作、容错两点之外,RDD还具有一些其他相关的特点,如:
- 不可变性(只能生成或转换,不能直接修改,容错时可以重算);
- 分区性(内部数据会划分为Partition,是分布式并行的基础);
- 名称中的“弹性”(可以灵活利用内存和外存,Spark设计思想的体现)。
RDD在Spark Core源码中的基础是o.a.s.rdd.RDD这个抽象类,本文就来对它做一些基础的了解。
RDD抽象类概述
构造方法与成员属性
代码#18.1 - o.a.s.rdd.RDD类的构造方法与成员属性
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
logWarning("Spark does not support nested RDDs (see SPARK-5063)")
}
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
@transient val partitioner: Option[Partitioner] = None
val id: Int = sc.newRddId()
@transient var name: String = _
private var storageLevel: StorageLevel = StorageLevel.NONE
private var dependencies_ : Seq[Dependency[_]] = _
@transient private var partitions_ : Array[Partition] = _
@transient private[spark] val creationSite = sc.getCallSite()
@transient private[spark] val scope: Option[RDDOperationScope] = {
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
}
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
private val checkpointAllMarkedAncestors =
Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)
@transient private var doCheckpointCalled = false
// ...
}
RDD类接收两个主构造方法参数:
- _sc:即SparkContext实例,它不会被序列化。
- deps:Dependency的序列,它也不会被序列化。所谓Dependency,就是指当前RDD对其他RDD的依赖关系,后面会讲到Dependency相关的知识。
在构造方法中会检查RDD是否被嵌套了,Spark不支持RDD嵌套,会打印警告信息。另外,还有一个辅助构造方法,它只接收一个RDD oneParent作为参数,此时会使用oneParent对应的SparkContext和一对一依赖OneToOneDependency来构造RDD。
RDD类的主要成员属性如下。
- partitioner:键值型RDD(即RDD[(K,V)])的分区逻辑,是Partitioner的子类,后面也会讲到与Partitioner相关的细节。
- id:该RDD的ID,可以调用SparkContext.newRddId()方法产生。
- name:RDD的可读名称。
- storageLevel:RDD的持久化等级,一共有12个等级。它由StorageLevel类及其伴生对象定义。
- dependencies_:RDD的依赖,与构造参数deps相同,但是可以序列化,并且会考虑当前RDD是否被Checkpoint。
- partitions_:包含RDD的所有分区的数组。
- creationSite:创建这个RDD的调用代码位置,通过SparkContext.getCallSite()方法获得。关于CallSite的简介可以参见文章#3。
- scope:RDD的操作域,由RDDOperationScope结构来描述。所谓操作域,其实就是一个确定的产生RDD的代码块,该代码块中的所有RDD就是在相同的操作域中。
- checkpointData:保存的RDD检查点数据,方便出错时重算。
- checkpointAllMarkedAncestors:布尔值,表示是否要对当前RDD的所有标记需要Checkpoint的父RDD保存检查点。
- doCheckpointCalled:布尔值,表示是否已经保存过该RDD的检查点,防止重复保存。
需要RDD子类实现的方法
RDD类中醒目地标出了4个抽象方法,它们都很重要,RDD的子类必须要提供具体实现,如下所示。
代码#18.2 - o.a.s.rdd.RDD类中的抽象方法
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- compute():计算RDD的一个分区split内的数据,返回对应数据类型的迭代器。
- getPartitions():取得RDD所有分区的数组。
- getDependencies():取得RDD的所有依赖,默认返回的就是deps。
- getPreferredLocations():取得计算分区split的偏好位置(如HDFS上块的位置)数组,这个是可选的。
RDD类中对Partition、Dependency和Preferred Location都提供了简单的Getter方法,它们都会先检查当前RDD的检查点,然后调用上面的三个抽象方法,其代码如下所示。
代码#18.3 - o.a.s.rdd.RDD.partitions()/dependencies()/preferredLocations()方法
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
partitions_.zipWithIndex.foreach { case (partition, index) =>
require(partition.index == index,
s"partitions($index).partition == ${partition.index}, but it should equal $index")
}
}
partitions_
}
}
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
RDD的五要素
通过以上的介绍,我们就可以归纳出RDD的五个组成要素了。这些内容在RDD类的ScalaDoc中其实已经有所体现:
- 分区列表 [A list of partitions];
- 计算每个分区的函数 [A function for computing each split];
- 对其他RDD的依赖的列表 [A list of dependencies on other RDDs];
- 可选的对键值型RDD的分区逻辑 [Optionally, a Partitioner for key-value RDDs];
- 可选的计算分区的位置偏好列表 [Optionally, a list of preferred locations to compute each split on]。
RDD继承体系与算子概述
RDD的子类
RDD拥有众多的子类,这些子类都实现了上面的4个方法。大多数对RDD的操作方法(也就是算子)返回的结果都是RDD子类的实例。主要的RDD子类如下图所示,没有箭头,看官将就一下吧。
图#18.1 - RDD继承体系由于我们之后还有很多事情要做,不可能将RDD的所有细节都分析一遍,这里暂时就不展开讲每个RDD子类的实现了。
我们已经知道,RDD的算子有两类,即转换(Transformation)算子与动作(Action)算子,这是老生常谈了。
转换算子
转换算子用于对一个RDD施加一系列逻辑,使之变成另一个RDD。在文章#0的WordCount程序中出现的flatMap()、map()、reduceByKey()都是转换算子。作为示例,我们来看看日常工作中极其常见、并且效率较高的mapPartitions()算子。
代码#18.4 - o.a.s.rdd.RDD.mapPartitions()方法
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
这个算子对RDD[T]每个分区的迭代器施加函数f的转换逻辑,返回一个MapPartitionsRDD[U],参数preservesPartitioning表示是否保留父RDD的分区。MapPartitionsRDD的具体实现如下。可以发现,getPartitions()和partitioner都直接复用了父RDD的,而compute()方法则是直接应用函数f的逻辑。
代码#18.5 - o.a.s.rdd.MapPartitionsRDD类
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {
super.clearDependencies()
prev = null
}
// ...
}
再举个例子,coalesce()算子可以将一个RDD重新分区,也是常用的转换算子之一。它最终会产生CoalescedRDD,如果中途发生Shuffle的话,也有可能会产生ShuffledRDD。关于它的实现,之前在一篇小文《解决Spark Streaming写入HDFS的小文件问题》中已经讲过,不再赘述。
动作算子
动作算子用于触发Job的提交,真正执行RDD转换逻辑的计算,并返回其处理结果。以代码#0.1中用到的collect()以及常用的foreach()为例。
代码#18.6 - o.a.s.rdd.RDD.collect()/foreach()方法
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
代码很简单,需要注意,它们都调用了SparkContext.runJob()方法来提交一个Job。这个方法比较重要,待到之后研究Spark Core调度逻辑时,它可以称得上是一切的起点。
总结
本文通过阅读与RDD类相关的一些基础源码,复习了RDD的基本知识,另外又对RDD的子类与算子有了大致的了解。下一篇文章会专注于两个要点:Dependency与Partitioner,即RDD的依赖与分区逻辑。