Spark(二):对RDD的一些理解
概述
基于Spark 2.4.0
RDD(Resilient Distributed Dataset):弹性分布式数据集。它描述了一个不可改变的、可分区的、可用于进行并行计算的数据集合。
RDD的五个主要属性:
①一组Partition
集合:Partition是对分区的描述。对于数据源RDD来说是对数据分片的描述。对于通过Transformation操作得到的RDD来说Partition仅仅包含分区编号。
②一个compute
函数:compute函数描述的是源数据或父RDD分区数据到当前RDD分区数据的变换逻辑。比如父RDD中编号为1的Partition代表的数据通过compute函数可以转换成当前RDD中编号为1的Partition。
③一组Dependency
集合:其它RDD的依赖,具体点就是描述当前RDD的分区依赖于父RDD的哪些分区。比如通过某个Dependency可知:当前RDD中编号为1的Partition依赖于父RDD中编号为1的Partition。
④一个Partitioner
:可选,用于键值对类型的RDD进行分区使用。
⑤一个用于计算RDD某个分区数据位置列表的preferredLocations
函数:可选,基于移动计算优于移动数据的原则,可以根据该列表将计算移动到数据所在的位置。
RDD的分类
①源数据RDD
顶级父RDD,没有Dependency。只有数据分片的描述Partition、compute函数和可选的preferredLocations函数。这类的RDD有HadoopRDD
、JdbcRDD
、ParallelCollectionRDD
等。这类RDD是对数据源的描述。
②通过Transfermation操作得到的RDD
它们追溯到最上层的父RDD必定是源数据RDD,有分区Partition、有Dependency描述父RDD的Partition到当前RDD的Partition的映射关系、有compute函数来描述父RDD的Partition通过什么样的函数计算能得到当前RDD的Partition和可选的Partitioner来进行数据分区。这类的RDD有MapPartitionsRDD
、UnionRDD
、ShuffledRDD
等。这类RDD描述的是怎么从父RDD得到自己。
RDD的分区
RDD代表一个数据集,在分而治之的计算思想下,我们很自然的会将一个大的数据集分解成很多小的数据集来并行计算,这里分解后的小数据集就是分区。分区不代表真实的数据集,只是数据集的描述。正如MapReduce中的数据分片一样。
对于源数据RDD:分区表示对一个真实存在的数据集的逻辑上的拆分。比如HDFS文件中的数据,关系型数据库表中数据,这些数据都是真实存在的。
以HadoopPartition为例:
private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
extends Partition {}
从构造函数传递的参数来看:
rddId: Int
:HadoopPartition所属RDD的ID。
index: Int
:当前HadoopPartition的唯一ID。
s: InputSplit
:Hadoop对数据的逻辑分片。
对于通过Transfermation操作得到的RDD:分区也表示数据集的逻辑上的拆分,只是这个数据集暂时不存在,需要经历一系列转化才能得到。分区属性包含当前分区的ID。这时候分区要与Dependency一起才能凑齐完整的语义:Dependency会告诉我们,当前分区依赖父RDD哪些分区。
以ShuffledRDDPartition为例:
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
override val index: Int = idx
}
RDD的compute函数
每个RDD都会实现compute函数,计算是以分片为单位。compute函数会在action操作被调用时触发,在函数内部是通过迭代器进行对应的转换操作的。
def compute(split: Partition, context: TaskContext): Iterator[T]
对于源数据RDD:compute函数代表对真实存在于HDFS文件、数据库里面数据的提取逻辑。
对于通过Transfermation操作得到的RDD:当前RDD的compute函数代表的是父RDD分区数据到当前RDD分区数据的变换逻辑。
RDD的依赖
RDD每次Transfermation操作都会生成新的RDD,所以RDD会形成一系列的前后依赖关系。在Spark中RDD的依赖分为两大类。
窄依赖(NarrowDependency):父RDD中的每个partition最多被子RDD中的一个partition使用。窄依赖描述着子RDD某个partition依赖于父RDD哪些partition。
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
窄依赖又分为以下几种:
①一对一依赖(OneToOneDependency):父RDD中的每个partition跟子RDD的partition是一对一关系。即父RDD的partition数量跟子RDD的partition数量一致、子RDD编号为1的partition的数据是由父RDD编号为1的partition的数据通过compute函数转化而来。例如map、filter操作。
②范围依赖(RangeDependency):子RDD和父RDD的Partition之间的关系是一个区间内的1对1对应关系。也就是说某个父RDD的partition经过compute函数转化只能变成子RDD的部分partition。例如union操作。
③删减依赖(PruneDependency):子RDD的Partition来自父RDD的多个Partition,这样看起来就好像把父RDD的分区合并了一样。例如filterByRange操作。
宽依赖(ShuffleDependency):子RDD中的每个Partition都有可能来源于父RDD的所有Partition,即父RDD和子RDD中的partition是多对多关系。正如MapReduce的shuffle阶段一样,每个Reducer都会从所有的Mapper拉取数据。例如groupByKey操作。
RDD的分区器
每个Key-Value形式的RDD都会有一个Partitioner,它决定了某条数据在shuffle阶段归属于RDD的哪个Partition。
从Partitioner类定义的两个方法可以看出它的作用:numPartitions
方法用来获取分区数量;getPartition
方法用根据这条数据的Key来确定这条数据属于哪个分区。
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
如果整个依赖链路上有多个RDD具备Partitioner,那么在shuffle的时候该选择哪个Partitioner?分区数目怎么确定?
来看一下Partitioner类的伴生的Partitioner对象:
①如果spark.default.parallelism参数被设置,则使用该值作为分区数量,否则使用上游RDD中最大分区数作为分区数量。这样能最大避免内存溢出。
②过滤出上游RDD中包含partitioner的RDD,如果某个RDD的分区数是最多的、分区数大于默认的分区数,并且partitioner是合格的,那么将使用该RDD中的partitioner作为分区器。否则使用HashPartitioner分区器。
object Partitioner {
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
* as the default partitions number, otherwise we'll use the max number of upstream partitions.
*
* When available, we choose the partitioner from rdds with maximum number of partitions. If this
* partitioner is eligible (number of partitions within an order of maximum number of partitions
* in rdds), or has partition number higher than default partitions number - we use this
* partitioner.
*
* Otherwise, we'll use a new HashPartitioner with the default partitions number.
*
* Unless spark.default.parallelism is set, the number of partitions will be the same as the
* number of partitions in the largest upstream RDD, as this should be least likely to cause
* out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
new HashPartitioner(defaultNumPartitions)
}
}
private def isEligiblePartitioner(
hasMaxPartitioner: RDD[_],
rdds: Seq[RDD[_]]): Boolean = {
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
}
}
RDD分区数据的优先位置列表
Spark在运算之前就已经知道了它运算的数据在哪个节点,基于移动计算优于移动数据的原则, Spark在进行任务调度时会尽可能地将任务分配到数据的节点上。
getPreferredLocations返回的是运行任务的优先位置,可以是host或者host:executorID。当返回的位置是host:executorID这种键值对时,Spark优先在该executorID上启动任务,其次在该host上的其他executor上启动任务。
protected def getPreferredLocations(split: Partition): Seq[String]
/**
* A location where a task should run. This can either be a host or a (host, executorID) pair.
* In the latter case, we will prefer to launch the task on that executorID, but our next level
* of preference will be executors on the same host if this is not possible.
*/
private[spark] sealed trait TaskLocation {
def host: String
}