spark之transform和action
1. tranformation
- map
map实现如下:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
map接收一个函数f为参数,该函数接收参数类型T,然后返回类型U。当前RDD数据类型T,map使用函数f将RDD中的每一条记录转换为类型为U的数据。 比如:
// 创建一个新的RDD oddNums,包含两个partition,只有奇数组成。
val oddNums = sc.parallelize(List(1,3,5,7,9),2)
// 使用函数 x => x + 1将 oddNums中的奇数转换成偶数。
val evenNums = oddNums.map(x => x + 1)
从map的实现的可以看出,函数cleanF是通过iter.map(cleanF)
发挥作用的,这就意味着iter中有多少个值,cleanF就会调用多少次,后面还会介绍mapPartitions,作用和map一样,但是实现有所区别,将会在mapPartitions中提到。
- flatMap
flatMap的原型如下:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
flatMap将每一个元素转换成一个集合类型,然后又将这些集合的元素拿出来展开拼在一起作为下一个RDD的数据。
flatMap接收的参数f同样也是一个函数,这个函数接收T类型(当前RDD的数据类型),然后返回一个集合,集合的元素类型为U(TraversableOnce一般都是集合实现的特质)。
flatMap调用new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
, iter迭代源RDD,迭代的元素类型T,it er.flatMap使用函数f将T转换成U的集合,然后返回U的集合上的迭代器。比如T1被转换成集合[U1,U2,U3], U上的迭代器迭代返回U1,U2,U3三个元素,而不是[U1,U2,U3]这个集合,也就是说集合被展开了。
看一个例子:
//RDD someNums包含数据1,2,3,4,5。要把它转换成1,1,2,2,3,3,4,4,5,5
val someNums = sc.parallelize(List(1,2,3,4,5))
val doubleSomeNums = someNums.flatMap(x => List(x,x))
doubleSomeNums.collect
// Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5),上面的1,2,3,4,5首先被转换成[1,1],[2,2],[3,3],[4,4],[5,5],然后在被连接成1,1,2,2,3,3,4,4
- filter
原型如下:
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
filter接收断言f,对RDD中的数据,满足f的返回,不满足的丢弃。
- distinct
原型如下:
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
distinct会将RDD中重复的数据只保留一份,这是一个全局去重操作,而不是仅仅对每个分区操作去重,全局去重就意味着需要将散落在各个分区里的元素聚合到一起。
上面代码表明的它的实现原理:
- 使用map将单个value映射成(value,null)这样的键值对;
- reduceByKey将相同的聚集在一起,(x,y) => x是其聚集使用的函数 ,聚合函数是作用在key相同的value上的,由于所有value都是null,所以这其实是(null,null) => null的函数。
- map(_._1)返回key,以数据[2,2]为例:
map(x => (x, null)) reduceByKey map(_._1)
| | |
输入2,2 -> (2,null),(2,null) -> (2, null) -> 2
- coalesce
coalesce用来改变RDD的分区个数,重新分区。方法原型如下:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
参数shuffle=true且是在扩大分区数(即目标rdd分区数numPartitions大于当前分区)则会导致shuffle过程。
- union
union用来将多个RDD做并集,合并后的数据不会进行去重。
其方法原型:
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
/*获得所有参与union的rdd的分区方法partitioner,转换成set
这就意味着如果所有的rdd使用相同的分区方法,比如都是HashPartitioner,
而且并且各自的partitioner相等(即equals返回true,对于HashPartitioner来说,
equals为 true的条件是分区的个数一样,RangePartitioner要复杂一点),那么返回的set即partitioners的size为1.
*/
val partitioners = rdds.flatMap(_.partitioner).toSet
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
} else {
new UnionRDD(this, rdds)
}
}
上面if分支中,如果参与union的rdd都定义了partitioner(rdds.forall(_.partitioner.isDefined)
返回true,一般只有ShuffledRDD有partitioner)且它们的partitioner一样,这就表示参与union的rdd都产生相同个数的分区(假设个数为p),这就好办了,union生成新的RDD:PartitionerAwareUnionRDD,新的RDD的拥有p个分区,第i个分区就有上游参与union的rdd里的第i个分区组成。所以总结一下,这种情况所有父rdd都有p个分区,那生成的新的rdd也有p个分区。
else分支中,创建UnionRDD。假设参与合并的rdd1,rdd2的分区分别是(R1P1,R1P2)和(R2P1,R2P2),一共4个分区,新的UnionRDD也将有四个分区,也就是(R1P1,R1P2,R2P1,R2P2)。
- sortBy、sortByKey
对RDD中的数据进行全局排序,下面是sortBy的原型:
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
上面sortBy实际上上调用了OrderedRDDFunctions#sortByKey(注: OrderedRDDFunctions经常出现,这里使用到了scala的隐式转换RDD隐式转换成OrderedRDDFunctions)方法。
sortByKey的是机遇reduce会对key进行排序这一原理实现的,利用每一个reducer会对自己分区内的key进行排序的原理,但是由于reducer只会保证自己分区内的数据按key排序,分区之间的有序则需要另外的机制来保证(参考hadoop terasort的排序原理)。
这里简单说一下原理:假设有10个分区,那我门就从数据中采样9个数,这9个数就决定了10个区间,然后shuffle时,就将每一个上游rdd中的数据都落到10个里的其中一个,这样partition之间也就有序了。
即然有shuffle这个过程,也就需要一个paritioner来决定数据流向下游那一个reducer,这里使用到的partitioner是RangePartitioner,而这里RangePartitoner的range的划分也就是上一段里那个简单原理介绍中所说。
注:关于shuffle的过程有兴趣的话可以参考Spark shuffle 原理
注:关于隐式转换可以参考scala 隐式转换
- intersection
求两个rdd的交集,交集的结果会去重,方法原型如下:
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
使用到了cogroup(PairRDDFunction提供的方法,PairRDDFunction中的方法只能作用于数据(key,value)形式的RDD,这里同样使用了RDD到PairRDDFunction的隐式转换)。两个rdd,分别是r1、r2,做cogroup操作,依然是按照两个rdd中相同的key做group,cogroup生成一个CoGroupedRDD类型的RDD,生成新的RDD的数据中key即源r1,r2相同的key,value是一个tuple,tuple的第一个元素是r1中key对应所有value上的iterator,第二个元素是r2中该key的所有value的iterator。
回到intersection方法,由于cogroup只能作用于数据(key,value)这种二元组形式的RDD,所以先将RDD的value map成(value, null); 接着做cogroup,做完cogroup之后,对于相交的数据,必然二元组中两个部分都不空(也就满足filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty
条件)。
下面是一个例子展示数据变换过程
//()表示tuple,[]表示列表, E表示空
输入: rdd1: 1, 2 ,3;rdd2: 2,3,4
map: rdd1 -> rdd3: (1,null),(2,null),(3,null)
map: rdd2 -> rdd4: (2,null),(3,null),(4,null)
cogroup:rdd3,rdd4 -> rdd5: (1, ([null], E)), (2, ([null], [null])), (3, ([null], [null])), (4,(E, [null]))
//value中有E表示这个key只存在于一个rdd中,去掉
filter: (2, ([null], [null])), (3, ([null], [null]))
keys: 2,3
其他的3个或者更多个rdd参与cogroup原理是一样的。
由于cogroup是一个比较复杂的过程,可以参考附录cogroup。
- glom
方法如下:
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
glom将分区里的所有数据合成到一个数组。
比如:
// rdd r1 包含1 to 5, 分成两个分区.分区1包含1,2;分区2包含3,4,5
scala> val r1 = sc.parallelize(1 to 5,2)
scala> r1.collect
res20: Array[Int] = Array(1, 2, 3, 4, 5)
// glom并调用collect查看结果. 依然包含两个分区,但是分区的元素被合成数组,也就是说原来分区1包含两个数据记录,现在只有一个类型为Array的数据记录了。
scala> r1.glom.collect
res21: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5))
从glom的实现来看,使用了iter.toArray将源rdd的一个分区里的数据放到一个数据里,是一个很消耗内存的方法,分区数据很多时还是要注意使用。
- cartesian
对两个rdd做笛卡尔积,方法原型如下:
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
}
假设参与笛卡尔积的两个rdd分别是r1,r2拥有分区[r1p1,r1p2]和[r2p1,r2p2],r1.cartesian(r2)
生成类型为CartesianRDD
的新rdd,假设是r3,r3拥有分区就是r1和r2分区的笛卡尔积, 即:[(r1p1, r2p1), (r1p1, r2p2), (r1p2, r2p1), (r1p2, r2p2)], 那么在r3上任意一个分区上计算时,假设是(r1p1, r2p1)上,只需要迭代r1p1, r2p1里的数据然后做笛卡尔积就行了。
下面是CartesianRDD的getPartitions方法:
override def getPartitions: Array[Partition] = {
// array保存分区,个数就是rdd1和rdd2分区个数相乘
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
//CartesianRDD拥有的分区也是rdd1和rdd2分区的笛卡尔积
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
下面是CartesianRDD的compute方法:
override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
val currSplit = split.asInstanceOf[CartesianPartition]
// CartesianRDD每一个分区都是上游rdd1和rdd2各一个分区组成,也就是下面的s1,s2. 此处两重循环的形式完成元素的笛卡尔积计算
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
- groupBy、 PairRDDFunction#groupByKey
方法如下:
//由于源rdd可以的数据t不是(key,value)这种二元组,因此
它需要一个f能够把源rdd里的数据类型T转换成key的类型K。最终生成的目标rdd的数据形式是(f(t), t)这种。
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
//group按key聚合涉及到shuffle,使用defaultPartitioner获得默认的partitioner是HashPartitoner
groupBy[K](f, defaultPartitioner(this))
}
groupBy把相同的key对应的value组合在一起,可以放到一个列表中,此外它不保证value的顺序,也不保证每次调用value都按相同方式排列。 下面是一个groupBy的例子:
val r1 = sc.parallelize(List(1,2,3,4,3,2),2)
r1.groupBy(x=>x).collect
//groupBy的结果,key相同的value都被放到CompactBuffer里,value仅仅是被简单的拼接。因此这是一种十分耗时且消耗存储的操作。
// grouyBy和reduceBy底层都使用PairRDDFunctions#combineByKeyWithClassTag,只不过使用的用来聚合value的aggregator不同,groupBy的aggregator就是将value加到CompactBuffer里。
res39: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (2,CompactBuffer(2, 2)), (1,CompactBuffer(1)), (3,CompactBuffer(3, 3)))
这里一路跟到PairRDDFunctions#groupByKey的实现看看:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
//CompactBuffer可以暂时理解做高效的ArrayBuffer
val createCombiner = (v: V) => CompactBuffer(v)
// mergeValue函数把key相同的value聚合到一起,这里的实现是直接到v添加到数组末尾
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
从上面聚合的方式来看,就是把value都放到数组里,这在数据很多时,是一种很好内存的操作,有可能会OOM,所以要注意,能用reduceBy的就不要用groupBy。
- mapPartitions
mapPartitions功能和map类似,但还是实现上还是有区别的,下面是mapPartitions的原型:
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
// 作为比较还有map的原型:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
比较一下map和mapPartitions的不同,用户自定的函数f,在map中是通过iter.map(cleanF)
调用的,这意味着每一次iter上的value迭代都会调用一次f; 而 mapPartitions中f是通过cleanedF(iter)
调用的,直接作用在iter上,然后返回一个新的iter,f实际上只被调用了一次。当有些资源需要在f中创建时(比如jdbc连接),使用map会导致频繁创建,可以考虑使用mapPartitions.
- zip
作用和集合上的zip一样,集合上zip会将两个集合相同index上的value合成tuple,这就要求两个集合大小一样。rdd上的zip要求两个rdd拥有相同个数的partition,每个partition又拥有相同个数的数据。
如下例子:
// RDD r1包含两个分区
scala> val r1 = sc.parallelize(1 to 10,2)
scala> r1.collect
res53: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
//RDD r2同样两个分区,且分区里数据个数和r1一样。
scala> val r2 = sc.parallelize(11 to 20,2)
scala> r2.collect
res54: Array[Int] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> r1.zip(r2).collect
// r1和r2相同下标的数据组合成一个元组(tuple)
res56: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20))
- subtract
求两个rdd的差,调用rdd1.substract(rdd2)会返回rdd1中去掉和rdd2相同数据的剩下部分, 但是不会对剩下的部分的数据去重。subtract都会最终调用下面的subtract方法:
def subtract(
other: RDD[T],
p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
//判断源rdd(即rdd1)的partitioner是否为空,不空的话往往意味着源rdd到目地RDD会产生shuffle操作生成的。
if (partitioner == Some(p)) {
//源rdd的partitoner不空,那源RDD的数据类型T一定是(key,value)形式的,这里之所以包装成新的partitioner,跟下面的map调用有关。下面的map会把源rdd中(key,value)数据作为新生成的rdd中的key,这里新的p2需要从新生成的rdd的key中(此时key类型(key,value))提取出源rdd的key。
val p2 = new Partitioner() {
override def numPartitions: Int = p.numPartitions
override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
// 再回顾上面的p2对partitioner的包装,源rdd有partitioner,则源rdd的类型是范型T实际是(key,value),此处map又把它转换成((key,value), null),所以需要包装成p2去key从(key,value)里取出来
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
} else {
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
}
}
下图是subtract产生的rdd依赖:
subtract产生的rdd依赖subtractByKey生成新的rdd为SubtractedRDD,下面是它的getDependencies
方法:
override def getDependencies: Seq[Dependency[_]] = {
def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
: Dependency[_] = {
/*由于这个方法传入的参数是上图的rdd3和rdd4都是map产生的,因此rdd.partitioner是空的,所以会走向else分支,
else分支产生了ShuffleDependency,所以无论如何都会产生shuffle。 强迫他产生一次上图中的shuffle也是可以理解的,
因为shuffle会使得上游rdd3,rdd4中key相同的进入到下游SubtractedRDD的同一分区上,那样做subtract就简单多了。
*/
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[T1, T2, Any](rdd, part)
}
}
// 这里的rdd1, rdd2对应SubtractedRDD上游依赖也就是上图的rdd3和rdd4
Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
}
下面是完成subtract的计算在SubtractedRDD#compute方法:
override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
// map是key到由key相同的value组成的Array的映射,这里所有的value都是null。
val map = new JHashMap[K, ArrayBuffer[V]]
// 对于key,map中有就返回对应的ArrayBuffer,没有就新建立一个
def getSeq(k: K): ArrayBuffer[V] = {
val seq = map.get(k)
if (seq != null) {
seq
} else {
val seq = new ArrayBuffer[V]()
map.put(k, seq)
seq
}
}
// 由于只有ShuffleDependency,所以只会走到shuffleDepency的case上。
// 这个函数根据depNum取到上游依赖的rdd(rdd3或则rdd4,然后对每一个值作为op的参数调用)
def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
dependencies(depNum) match {
case oneToOneDependency: OneToOneDependency[_] =>
val dependencyPartition = partition.narrowDeps(depNum).get.split
oneToOneDependency.rdd.iterator(dependencyPartition, context)
.asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
case shuffleDependency: ShuffleDependency[_, _, _] =>
//shuffleManager.getReader返回的迭代器迭代的一定是按key排好序的
val iter = SparkEnv.get.shuffleManager
.getReader(
shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
.read()
iter.foreach(op)
}
}
// depNum = 0,先跌打rdd3 shuffle之后的数据,按照key在map中拿到ArrayBuffer,再把value都放到ArrayBuffer中。
integrate(0, t => getSeq(t._1) += t._2)
// 即然是做subtract,在迭代rdd4中的数据,对于每一个key,从map中去掉就行了。
integrate(1, t => map.remove(t._1))
map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
}
其原理是迭代rdd1中的key,放到map中,然后迭代rdd2中的key,在将key从之前的map中删除,得到的就是求差的结果。
- zipWithIndex
对于rdd中的每一个数据,返回的数据以及数据在rdd中的索引组成的tuple, 如下例:
val r1 = sc.parallelize(List('a','b','c','d'),2)
// 返回tuple包括a,b,c,d在rdd中索引,而且是全局索引。
scala> r1.zipWithIndex.collect
res70: Array[(Char, Long)] = Array((a,0), (b,1), (c,2), (d,3))
方法实现如下:
def zipWithIndex(): RDD[(T, Long)] = withScope {
new ZippedWithIndexRDD(this)
}
zipWithIndex的基本原理:由于需要知道每一个partition里面的每一个元素的全局索引,首先需要计算出每一个partition的元素的个数,这样就能计算出第i个partition的第一个元素在所有全部数据里面的偏移值,接下来就简单了,由于任务是基于parition上的数据迭代的,那么parition里的数据的全局偏移就是该partition的第一个元素的偏移加上当前迭代到的元素在parition里的偏移值。
下面是ZippedWithIndexRDD中定义的一些方法:
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
//获得依赖的父rdd的partition的个数
val n = prev.partitions.length
if (n == 0) {
Array.empty
} else if (n == 1) {
Array(0L)
} else {
/*这里提交了一个spark job运行来统计每一个partition的元素个数。
1. 参数Utils.getIteratorSize是一个函数,task运行在分区上时调用,它返回分区大元素个数.
2. 参数0 until n-1指定了运行task的分区是[0, n-1),不需要计算最后一个分区大小,
因为最后一个分区的偏移是前面所有分区的元素个数之和。
3. scanLeft(0L)(_ + _),runJob返回[0,n-1)的partition大小的列表,scanLeft计算出偏移。
*/
prev.context.runJob(
prev,
Utils.getIteratorSize _,
0 until n - 1
).scanLeft(0L)(_ + _)
}
}
override def getPartitions: Array[Partition] = {
//根据上有partition包装新的分区ZippedWithIndexRDDPartition,新的分区携带了自己的偏移。这是一个窄依赖
firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
}
override def getPreferredLocations(split: Partition): Seq[String] =
firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)
override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
val parentIter = firstParent[T].iterator(split.prev, context)
// 重要的是这个方法,迭代上游分区的数据,返回(data, data_index)
Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
}
}
---------------------------
贴一下Utils.getIteratorZipWithIndex的实现:
1. 参数iterator是上游partition的迭代器
2. startIndex是上游partition的第一个元素的全局偏移
def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
new Iterator[(T, Long)] {
require(startIndex >= 0, "startIndex should be >= 0.")
var index: Long = startIndex - 1L
def hasNext: Boolean = iterator.hasNext
// next返回数据和其index的元组。
def next(): (T, Long) = {
index += 1L
(iterator.next(), index)
}
}
}
2. action
这小节里列出action,和transform不同,action会触发job的提交运行。
- reduce
原型如下:
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
/* 这个函数作为runJob的第二个参数,作用于一个job里的最后一个阶段(ResultStage)每一个分区。
这个函数干了什么: 接受一个上游parition上的迭代器,然后调用迭代器的reduceLeft, reduceLeft使用函数f来对数据做reduce。
所以这个函数完成了ResultStage的每一个分区的reduce,不是全局的reduce
*/
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
/* 这个函数作为sc.runJob的第三个参数,当reducePartition完成每一个分区的reduce之后,
用来对每一个分区的reduce结果合并,index是分区索引,taskResult即分区计算结果。
它干了什么:同样适用函数f来对结果做规约,完成全局的reduce。
*/
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
/* 提交job,runJob需要两个参数,reducePartition作用于每个分区之上,也就是在executor上运行;
mergeResult运行于driver端,收集每一个分区的结果到driver端,然后对这些结果运行mergeResult,如果每一个分区产生的结果很大的话,显然reduce可能会在driver端出现OOM
*/
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
这个和reduceByKey不一样,reduceByKey是一个transform操作,会产生一个新的RDD(与上游RDD形成ShuffleDependency),这里的reduce是一个action,会触发job的提交(上面代码中sc.runJob
);此外reduceByKey要求输入数据必须是(key,value)的二元tuple,而此处的reduce则不需要。
- aggregate
aggregate也是对值做聚合操作的,但是和reduce还是不同的,下面是其方法原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
//作用于每个分区中的数据,对每个分区中的数据聚合。运行于executor上
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
//运行在driver端,对收集回来的每个分区的聚合结果再一次聚合。
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
// 和reduce一样,先用aggregatePartition在每一个分区上运行聚合分区的数据,然后获取所有分区的数据,使用mergeResult在Driver端聚合,同样从在Driver端OOM的可能。
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
reduce和aggregate聚合值的区别从方法签名就可以看出,reduce聚合前后的值的类型是一样的,比如说你不能用reduce把一个int值拼成string返回。aggregate则可以把一种类型(T)的值聚合成另一种类型(U)返回。
上面aggregate方法,U是聚合后类型,T是聚合前类型; 参数zeroValue提供一个初始值,seqOp定义怎么把T聚合到U上,combOp定义怎么把多个分区聚合后的值拼起来。
下面是一个例子,把Int拼接成字符串
// RDD r1 包含1 to 10的整型
scala> r1.collect
res63: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// seq, 拼接字符串和整型
scala> def seq(s:String, i:Int):String = { s + i}
// comb,拼接分区聚合后的字符串
scala> def comb(s1:String, s2:String):String = { s1 + s2}
// 初始值zeroValue为空字符串
scala> r1.aggregate("")(seq,comb)
res64: String = 12345678910
- treeAggregate
treeAggregate和aggregate功能上是一样的,但是实现细节不一样,下面treeAggregate的实现:
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
// Don't trigger TreeAggregation when it doesn't save wall-clock time
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
}
}
前文说到aggregate的过程是:先使用seqOp在各个分区上聚合,然后将分区结果全部拿到Driver端,然后使用combOp聚合,过多的分区的数据被移到driver端可能会导致driver上OOM;treeAggregate不同之处在于,分区聚合之后不马上把结果传回driver端聚合,而是调用reduceByKey再在远端按key聚合到更小的分区,如有必要还会经过多轮的reduceByKey,不停的把值聚合到更小的分区上,最终传回driver端做最终聚合。下图可以反应出aggregate和treeAggregate的过程上的区别:
aggregate和treeAggregate-
fold
和scala集合上的fold功能一样,实现原理和reduce一样,现在每一个分区上fold,然后结果传回driver在merge. -
take
方法原型如下:def take(num: Int): Array[T]
take接收一个整型参数num,返回rdd中前num个数(从第1个partition的第1个数开始的num个数).
take的思路大概是这样的:- 使用一个ArrayBuffer buf保存返回结果,buf.size就表示已经取到的结果,一开始时显然为0.
- 开始时将运行task的分区设成一个(也就是第一个partition0),因为不知道前num个元素会横跨多少parition,先尝试1个
- 运行job在分区上取前num - buf.size(也就是还需要取的个数),放到buf中。
- 判断buf.size有没有达到num,没有进入4. 达到就可以返回了。
- 按照某种比例扩大下一轮运行任务的分区个数,下一次job运行的的分区的索引为成区间[上一次任务运行最大分区索引 +1 , 上一次任务运行最大分区索引 +下一轮分区个数], 回到2继续运行。 (比如在partition0上数据不够num个,只有num1个,那么假设下一次扩大到在两个分区上运行,那么下一轮就在[partition-1,partition-2] 上取num - num1个数据)。
-
top, takeOrdered
这是两个方法,放在一起是因为top是基于调用takeOrdered实现的,它们的方法原型如下:
// top返回最大的前num个数,元素排序由ord定义,ord比较x,y, 返回负数表示x<y, 0表示x==y。
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
//takeOrdered默认是从小到大返回的,所以此处使用ord.reverse颠倒排序
takeOrdered(num)(ord.reverse)
}
//takeOrdered返回最小的的num个数,排序由ord定义
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
/* mapPartitions表示在每一个分区上运行,queue相当于一个大小为num的大根堆,维持当前已经迭代(items迭代器)的最小的num个值.
mapParititons生成的新rdd mapRDDs的每一个parititon拥有之前上游rdd
每个parititon的最小的num个元素
*/
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
// 可以回顾下reduce action,获取到每一个分区返回的最小的num个元素值。然后合并这些值就能得到rdd中最小的num个元素。
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}
上面takeOrdered使用了RDD#reduce这个方法将每个分区的最小的num个数传会driver,在driver比较获得全局最小的num个数,如果num值很大的话会造成driver OOM
- max,min
获取rdd中最大和最小值
def max()(implicit ord: Ordering[T]): T = withScope {
// 回顾reduce,接收(T,T) => T的函数,ord.max方法比较两个值,返回大的。
// reduce现在每个partition上运行ord.max取得partition最大的值,然后将这些值返回给driver端,得到最大的值。
this.reduce(ord.max)
}
def min()(implicit ord: Ordering[T]): T = withScope {
this.reduce(ord.min)
}
3 附录
3.1 cogroup
3.1.1 cogroup的作用
首先cogroup是PairRDDFunctions中定义的方法,它只能作用于元素类型是(key,value)二元组型这样的rdd, cogroup可以接收多个rdd作为参数进行操作,但是为了方便,这里只假设有两个rdd: r1, r2.
r1,r2 cogroup产生新的rdd r3: r3的key包含了r1,r2的所有的key,对于key的value是一个数组,数组组的元素依次是key在r1和r2中所有的value的数组。
下面是一个例子:
//r1, r2是国家到城市的二元组
val r1 = sc.parallelize(List(("china","hefei"),("USA","chicago"),("japan","tokyo")))
val r2 = sc.parallelize(List(("china","beijing"),("USA","new york"),("china","shanghai")))
r1.cogroup(r2).collect
//输出,CompactBuffer可以理解成数组,可以看到key包含了r1,r2的所有的key,
Array((japan,(CompactBuffer(tokyo),CompactBuffer())), (USA,(CompactBuffer(chicago),CompactBuffer(new york))), (china,(CompactBuffer(hefei),CompactBuffer(beijing, shanghai))))
3.1.2 cogroup原理
cogroup会产生CoGroupedRDD,直接看他的实现吧:
//rdd即参与cogroup的所有rdd,是一个数组,所以可以有多个rdd。
//类型化参数'_ <: Product2[K, _]'表明rdd的元素必须是二元组,而且所有的rdd的key类型得是一样的.
//part默认是HashPartitioner
class CoGroupedRDD[K: ClassTag](
@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
part: Partitioner)
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
/**
1. 这里定义了新类型,CoGroup即来源于某一个上游rdd的key的value组成的数组.
2. CoGroupValue,二元组,第一个元素Any类型是上游rdd中value,注意上游rdd的类型是(key,value),这里是提取value出来的,
第二个元素Int是上游rdd在在dependencies列表中的index,也就是第一个元素来源于的那个rdd。
3. CoGroupCombiner, 数组,每个元素是一个CoGroup,也就是说第i元素
就是key在第I个rdd中所有value组成的数组。
*/
private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Array[CoGroup]
private var serializer: Serializer = SparkEnv.get.serializer
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
def setSerializer(serializer: Serializer): CoGroupedRDD[K] = {
this.serializer = serializer
this
}
override def getDependencies: Seq[Dependency[_]] = {
//获取依赖时,遍历上游所有rdd
rdds.map { rdd: RDD[_] =>
//上游用的partitioner和当前CoGroupedRDD一样,默认的HashPartitioner相同的判断标准时产生一样的分区个数,RangeParitioner复杂一点。
// 不管如果,相同就意味着上游rdd是通过shuffle产生的,所有的元素已经按照key聚合到对应的partiton了,
// 当前RDD和上游rdd的分区直接可以一对一依赖,不同再shuffle一次聚合key了。
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
// 否则的话只好shuffle一次,按key聚合好
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _, _] =>
None
case _ =>
Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
}
}.toArray)
}
array
}
override val partitioner: Some[Partitioner] = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = dependencies.length
/* 看看做了什么: 首先返回的是迭代器数组,包含对每一个上游rdd的迭代.
其次迭代的元素类型是一个二元组,第一个元素类型‘Product2[K, Any]’表明它是上游rdd里的数据, 第二个元素Int则表明第一个元素所属的rdd
*/
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
// 跟上有一对一依赖就简单很多了,直接取到依赖的上游parition,返回数据和上有rdd的索引就行了。
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
rddIterators += ((it, depNum))
case shuffleDependency: ShuffleDependency[_, _, _] =>
// 跟上游shuffle依赖,那么就需要有shuffle read的过程,不提细节,总之shuffle read完成之后,
//会从上游所有rdd中收集了属于当前CoGroupedRDD的当前分区的所有元素,
val it = SparkEnv.get.shuffleManager
.getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
.read()
rddIterators += ((it, depNum))
}
/* 返回了一个类型是‘ ExternalAppendOnlyMap’的东西,它是干什么的呢,简单说, 这个map有按key聚合的作用,就像reduceBy一样。
当你往里面插一个元素时,它会按照你定义的combine和merger函数,把相同的key的元素聚合起来
*/
val map = createExternalMap(numRdds)
//迭代上游数据,根据前面rddIterator的定义,此处it是上游rdd中的数据,类型应该是(key,value)的,depNum是rdd索引
for ((it, depNum) <- rddIterators) {
// map要求插入的元素必须是(K,V)型的,这里的pair._1就是rdd中的key,value是CoGroupValue介绍过,所以map会按照key来聚合。
//所以关键是map的combiner和merger的实现
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
//map聚合好之后,返回新的迭代器,返回InterruptibleIterator表示它可以被中途取消
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
private def createExternalMap(numRdds: Int)
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
// createCombiner用来在出现第一个元素时,将该元素转换成聚合后的元素,可能是列表之类的,什么都可以
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
//value._2是rdd索引,value._1是rdd数据(key,value)中的value,这句表示value加到数组中。
newCombiner(value._2) += value._1
newCombiner
}
//将元素合并到聚合后的新类型元素上,还是往数组里加
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
combiner(value._2) += value._1
combiner
}
// 将两个聚合后的新类型合并,合并两个数组
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
(combiner1, combiner2) => {
var depNum = 0
while (depNum < numRdds) {
combiner1(depNum) ++= combiner2(depNum)
depNum += 1
}
combiner1
}
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)
}
override def clearDependencies() {
super.clearDependencies()
rdds = null
}
}
注:
- 关于shuffle read可以参考shuffle read第三节
- 关于ExternalAppendOnlyMap可以参考ExternalAppendOnlyMap4.3节