spark之transform和action

2017-08-06  本文已影响0人  aaron1993

1. tranformation

  1. map
    map实现如下:
 def map[U: ClassTag](f: T => U): RDD[U] = withScope {
   val cleanF = sc.clean(f)
   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
 }

map接收一个函数f为参数,该函数接收参数类型T,然后返回类型U。当前RDD数据类型T,map使用函数f将RDD中的每一条记录转换为类型为U的数据。 比如:

// 创建一个新的RDD oddNums,包含两个partition,只有奇数组成。
val oddNums = sc.parallelize(List(1,3,5,7,9),2)
// 使用函数 x => x + 1将 oddNums中的奇数转换成偶数。
val evenNums = oddNums.map(x => x + 1)

从map的实现的可以看出,函数cleanF是通过iter.map(cleanF)发挥作用的,这就意味着iter中有多少个值,cleanF就会调用多少次,后面还会介绍mapPartitions,作用和map一样,但是实现有所区别,将会在mapPartitions中提到。

  1. flatMap
    flatMap的原型如下:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

flatMap将每一个元素转换成一个集合类型,然后又将这些集合的元素拿出来展开拼在一起作为下一个RDD的数据。

flatMap接收的参数f同样也是一个函数,这个函数接收T类型(当前RDD的数据类型),然后返回一个集合,集合的元素类型为U(TraversableOnce一般都是集合实现的特质)。

flatMap调用new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)), iter迭代源RDD,迭代的元素类型T,it er.flatMap使用函数f将T转换成U的集合,然后返回U的集合上的迭代器。比如T1被转换成集合[U1,U2,U3], U上的迭代器迭代返回U1,U2,U3三个元素,而不是[U1,U2,U3]这个集合,也就是说集合被展开了。

看一个例子:

//RDD someNums包含数据1,2,3,4,5。要把它转换成1,1,2,2,3,3,4,4,5,5
val someNums = sc.parallelize(List(1,2,3,4,5))
val doubleSomeNums = someNums.flatMap(x => List(x,x))
doubleSomeNums.collect
// Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5),上面的1,2,3,4,5首先被转换成[1,1],[2,2],[3,3],[4,4],[5,5],然后在被连接成1,1,2,2,3,3,4,4
  1. filter
    原型如下:
def filter(f: T => Boolean): RDD[T] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    this,
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)
}

filter接收断言f,对RDD中的数据,满足f的返回,不满足的丢弃。

  1. distinct
    原型如下:
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
   map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
 }

distinct会将RDD中重复的数据只保留一份,这是一个全局去重操作,而不是仅仅对每个分区操作去重,全局去重就意味着需要将散落在各个分区里的元素聚合到一起。

上面代码表明的它的实现原理:

        map(x => (x, null))   reduceByKey   map(_._1)
                 |                 |             |
输入2,2 -> (2,null),(2,null) ->  (2, null)   ->      2
  1. coalesce
    coalesce用来改变RDD的分区个数,重新分区。方法原型如下:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
            partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
           (implicit ord: Ordering[T] = null)
   : RDD[T] 

参数shuffle=true且是在扩大分区数(即目标rdd分区数numPartitions大于当前分区)则会导致shuffle过程。

  1. union
    union用来将多个RDD做并集,合并后的数据不会进行去重。
    其方法原型:
  def union(other: RDD[T]): RDD[T] = withScope {
  sc.union(this, other)
  }
  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
  /*获得所有参与union的rdd的分区方法partitioner,转换成set
   这就意味着如果所有的rdd使用相同的分区方法,比如都是HashPartitioner,
  而且并且各自的partitioner相等(即equals返回true,对于HashPartitioner来说,
  equals为 true的条件是分区的个数一样,RangePartitioner要复杂一点),那么返回的set即partitioners的size为1.
   */
   val partitioners = rdds.flatMap(_.partitioner).toSet
  if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
    new PartitionerAwareUnionRDD(this, rdds)
  } else {
    new UnionRDD(this, rdds)
  }
  }

上面if分支中,如果参与union的rdd都定义了partitioner(rdds.forall(_.partitioner.isDefined)返回true,一般只有ShuffledRDD有partitioner)且它们的partitioner一样,这就表示参与union的rdd都产生相同个数的分区(假设个数为p),这就好办了,union生成新的RDD:PartitionerAwareUnionRDD,新的RDD的拥有p个分区,第i个分区就有上游参与union的rdd里的第i个分区组成。所以总结一下,这种情况所有父rdd都有p个分区,那生成的新的rdd也有p个分区。

else分支中,创建UnionRDD。假设参与合并的rdd1,rdd2的分区分别是(R1P1,R1P2)和(R2P1,R2P2),一共4个分区,新的UnionRDD也将有四个分区,也就是(R1P1,R1P2,R2P1,R2P2)。

  1. sortBy、sortByKey
    对RDD中的数据进行全局排序,下面是sortBy的原型:
def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}

上面sortBy实际上上调用了OrderedRDDFunctions#sortByKey(注: OrderedRDDFunctions经常出现,这里使用到了scala的隐式转换RDD隐式转换成OrderedRDDFunctions)方法。

sortByKey的是机遇reduce会对key进行排序这一原理实现的,利用每一个reducer会对自己分区内的key进行排序的原理,但是由于reducer只会保证自己分区内的数据按key排序,分区之间的有序则需要另外的机制来保证(参考hadoop terasort的排序原理)。

这里简单说一下原理:假设有10个分区,那我门就从数据中采样9个数,这9个数就决定了10个区间,然后shuffle时,就将每一个上游rdd中的数据都落到10个里的其中一个,这样partition之间也就有序了。

即然有shuffle这个过程,也就需要一个paritioner来决定数据流向下游那一个reducer,这里使用到的partitioner是RangePartitioner,而这里RangePartitoner的range的划分也就是上一段里那个简单原理介绍中所说。

注:关于shuffle的过程有兴趣的话可以参考Spark shuffle 原理
注:关于隐式转换可以参考scala 隐式转换

  1. intersection
    求两个rdd的交集,交集的结果会去重,方法原型如下:
def intersection(other: RDD[T]): RDD[T] = withScope {
  this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
      .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
      .keys
}

使用到了cogroup(PairRDDFunction提供的方法,PairRDDFunction中的方法只能作用于数据(key,value)形式的RDD,这里同样使用了RDD到PairRDDFunction的隐式转换)。两个rdd,分别是r1、r2,做cogroup操作,依然是按照两个rdd中相同的key做group,cogroup生成一个CoGroupedRDD类型的RDD,生成新的RDD的数据中key即源r1,r2相同的key,value是一个tuple,tuple的第一个元素是r1中key对应所有value上的iterator,第二个元素是r2中该key的所有value的iterator。

回到intersection方法,由于cogroup只能作用于数据(key,value)这种二元组形式的RDD,所以先将RDD的value map成(value, null); 接着做cogroup,做完cogroup之后,对于相交的数据,必然二元组中两个部分都不空(也就满足filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty条件)。

下面是一个例子展示数据变换过程

//()表示tuple,[]表示列表, E表示空
输入: rdd1: 1, 2 ,3;rdd2: 2,3,4
map: rdd1 -> rdd3: (1,null),(2,null),(3,null)
map: rdd2 -> rdd4: (2,null),(3,null),(4,null)
cogroup:rdd3,rdd4 ->  rdd5: (1, ([null], E)), (2, ([null], [null])), (3, ([null], [null])), (4,(E, [null]))
//value中有E表示这个key只存在于一个rdd中,去掉
filter: (2, ([null], [null])), (3, ([null], [null]))
keys: 2,3

其他的3个或者更多个rdd参与cogroup原理是一样的。
由于cogroup是一个比较复杂的过程,可以参考附录cogroup。

  1. glom
    方法如下:
def glom(): RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }

glom将分区里的所有数据合成到一个数组。
比如:

// rdd r1 包含1 to 5, 分成两个分区.分区1包含1,2;分区2包含3,4,5
scala> val r1 = sc.parallelize(1 to 5,2)
scala> r1.collect
res20: Array[Int] = Array(1, 2, 3, 4, 5)

// glom并调用collect查看结果. 依然包含两个分区,但是分区的元素被合成数组,也就是说原来分区1包含两个数据记录,现在只有一个类型为Array的数据记录了。
scala> r1.glom.collect
res21: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5))

从glom的实现来看,使用了iter.toArray将源rdd的一个分区里的数据放到一个数据里,是一个很消耗内存的方法,分区数据很多时还是要注意使用。

  1. cartesian
    对两个rdd做笛卡尔积,方法原型如下:
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
  new CartesianRDD(sc, this, other)
}

假设参与笛卡尔积的两个rdd分别是r1,r2拥有分区[r1p1,r1p2]和[r2p1,r2p2],r1.cartesian(r2)生成类型为CartesianRDD的新rdd,假设是r3,r3拥有分区就是r1和r2分区的笛卡尔积, 即:[(r1p1, r2p1), (r1p1, r2p2), (r1p2, r2p1), (r1p2, r2p2)], 那么在r3上任意一个分区上计算时,假设是(r1p1, r2p1)上,只需要迭代r1p1, r2p1里的数据然后做笛卡尔积就行了。

下面是CartesianRDD的getPartitions方法:

override def getPartitions: Array[Partition] = {
    // array保存分区,个数就是rdd1和rdd2分区个数相乘
    val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
    //CartesianRDD拥有的分区也是rdd1和rdd2分区的笛卡尔积
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }

下面是CartesianRDD的compute方法:

override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
   // CartesianRDD每一个分区都是上游rdd1和rdd2各一个分区组成,也就是下面的s1,s2. 此处两重循环的形式完成元素的笛卡尔积计算
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }
  1. groupBy、 PairRDDFunction#groupByKey
    方法如下:
//由于源rdd可以的数据t不是(key,value)这种二元组,因此
它需要一个f能够把源rdd里的数据类型T转换成key的类型K。最终生成的目标rdd的数据形式是(f(t), t)这种。
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    //group按key聚合涉及到shuffle,使用defaultPartitioner获得默认的partitioner是HashPartitoner
    groupBy[K](f, defaultPartitioner(this))
  }

groupBy把相同的key对应的value组合在一起,可以放到一个列表中,此外它不保证value的顺序,也不保证每次调用value都按相同方式排列。 下面是一个groupBy的例子:

val r1 = sc.parallelize(List(1,2,3,4,3,2),2)
r1.groupBy(x=>x).collect
//groupBy的结果,key相同的value都被放到CompactBuffer里,value仅仅是被简单的拼接。因此这是一种十分耗时且消耗存储的操作。
// grouyBy和reduceBy底层都使用PairRDDFunctions#combineByKeyWithClassTag,只不过使用的用来聚合value的aggregator不同,groupBy的aggregator就是将value加到CompactBuffer里。
res39: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (2,CompactBuffer(2, 2)), (1,CompactBuffer(1)), (3,CompactBuffer(3, 3)))

这里一路跟到PairRDDFunctions#groupByKey的实现看看:

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
   //CompactBuffer可以暂时理解做高效的ArrayBuffer
    val createCombiner = (v: V) => CompactBuffer(v)
  // mergeValue函数把key相同的value聚合到一起,这里的实现是直接到v添加到数组末尾
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

从上面聚合的方式来看,就是把value都放到数组里,这在数据很多时,是一种很好内存的操作,有可能会OOM,所以要注意,能用reduceBy的就不要用groupBy。

  1. mapPartitions
    mapPartitions功能和map类似,但还是实现上还是有区别的,下面是mapPartitions的原型:
  def mapPartitions[U: ClassTag](
     f: Iterator[T] => Iterator[U],
     preservesPartitioning: Boolean = false): RDD[U] = withScope {
   val cleanedF = sc.clean(f)
   new MapPartitionsRDD(
     this,
     (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
     preservesPartitioning)
 }

 // 作为比较还有map的原型:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
   val cleanF = sc.clean(f)
   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
 }

比较一下map和mapPartitions的不同,用户自定的函数f,在map中是通过iter.map(cleanF)调用的,这意味着每一次iter上的value迭代都会调用一次f; 而 mapPartitions中f是通过cleanedF(iter)调用的,直接作用在iter上,然后返回一个新的iter,f实际上只被调用了一次。当有些资源需要在f中创建时(比如jdbc连接),使用map会导致频繁创建,可以考虑使用mapPartitions.

  1. zip
    作用和集合上的zip一样,集合上zip会将两个集合相同index上的value合成tuple,这就要求两个集合大小一样。rdd上的zip要求两个rdd拥有相同个数的partition,每个partition又拥有相同个数的数据。

如下例子:

// RDD r1包含两个分区
scala> val r1 = sc.parallelize(1 to 10,2)
scala> r1.collect
res53: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
//RDD r2同样两个分区,且分区里数据个数和r1一样。
scala> val r2 = sc.parallelize(11 to 20,2)
scala> r2.collect
res54: Array[Int] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> r1.zip(r2).collect
// r1和r2相同下标的数据组合成一个元组(tuple)
res56: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20))
  1. subtract
    求两个rdd的差,调用rdd1.substract(rdd2)会返回rdd1中去掉和rdd2相同数据的剩下部分, 但是不会对剩下的部分的数据去重。subtract都会最终调用下面的subtract方法:
def subtract(
   other: RDD[T],
   p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
 //判断源rdd(即rdd1)的partitioner是否为空,不空的话往往意味着源rdd到目地RDD会产生shuffle操作生成的。
 if (partitioner == Some(p)) {

  //源rdd的partitoner不空,那源RDD的数据类型T一定是(key,value)形式的,这里之所以包装成新的partitioner,跟下面的map调用有关。下面的map会把源rdd中(key,value)数据作为新生成的rdd中的key,这里新的p2需要从新生成的rdd的key中(此时key类型(key,value))提取出源rdd的key。
   val p2 = new Partitioner() {
     override def numPartitions: Int = p.numPartitions
     override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
     }
    // 再回顾上面的p2对partitioner的包装,源rdd有partitioner,则源rdd的类型是范型T实际是(key,value),此处map又把它转换成((key,value), null),所以需要包装成p2去key从(key,value)里取出来
     this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
     } else {
     this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
     }
   }

下图是subtract产生的rdd依赖:

subtract产生的rdd依赖

subtractByKey生成新的rdd为SubtractedRDD,下面是它的getDependencies方法:

 override def getDependencies: Seq[Dependency[_]] = {
    def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
      : Dependency[_] = {
      /*由于这个方法传入的参数是上图的rdd3和rdd4都是map产生的,因此rdd.partitioner是空的,所以会走向else分支,
       else分支产生了ShuffleDependency,所以无论如何都会产生shuffle。 强迫他产生一次上图中的shuffle也是可以理解的,
       因为shuffle会使得上游rdd3,rdd4中key相同的进入到下游SubtractedRDD的同一分区上,那样做subtract就简单多了。
       */
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[T1, T2, Any](rdd, part)
      }
    }
   // 这里的rdd1, rdd2对应SubtractedRDD上游依赖也就是上图的rdd3和rdd4
    Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
  }

下面是完成subtract的计算在SubtractedRDD#compute方法:

override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
    val partition = p.asInstanceOf[CoGroupPartition]
   // map是key到由key相同的value组成的Array的映射,这里所有的value都是null。
    val map = new JHashMap[K, ArrayBuffer[V]]
    // 对于key,map中有就返回对应的ArrayBuffer,没有就新建立一个
    def getSeq(k: K): ArrayBuffer[V] = {
      val seq = map.get(k)
      if (seq != null) {
        seq
      } else {
        val seq = new ArrayBuffer[V]()
        map.put(k, seq)
        seq
      }
    } 

   // 由于只有ShuffleDependency,所以只会走到shuffleDepency的case上。
   // 这个函数根据depNum取到上游依赖的rdd(rdd3或则rdd4,然后对每一个值作为op的参数调用)
    def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
      dependencies(depNum) match {
        case oneToOneDependency: OneToOneDependency[_] =>
          val dependencyPartition = partition.narrowDeps(depNum).get.split
          oneToOneDependency.rdd.iterator(dependencyPartition, context)
            .asInstanceOf[Iterator[Product2[K, V]]].foreach(op)

        case shuffleDependency: ShuffleDependency[_, _, _] =>
          //shuffleManager.getReader返回的迭代器迭代的一定是按key排好序的
          val iter = SparkEnv.get.shuffleManager
            .getReader(
              shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
            .read()
          iter.foreach(op)
      }
    }

    // depNum = 0,先跌打rdd3 shuffle之后的数据,按照key在map中拿到ArrayBuffer,再把value都放到ArrayBuffer中。
    integrate(0, t => getSeq(t._1) += t._2)
    // 即然是做subtract,在迭代rdd4中的数据,对于每一个key,从map中去掉就行了。
    integrate(1, t => map.remove(t._1))
    map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
  }

其原理是迭代rdd1中的key,放到map中,然后迭代rdd2中的key,在将key从之前的map中删除,得到的就是求差的结果。

  1. zipWithIndex
    对于rdd中的每一个数据,返回的数据以及数据在rdd中的索引组成的tuple, 如下例:
 val r1 = sc.parallelize(List('a','b','c','d'),2)
 // 返回tuple包括a,b,c,d在rdd中索引,而且是全局索引。
 scala> r1.zipWithIndex.collect
     res70: Array[(Char, Long)] = Array((a,0), (b,1), (c,2), (d,3))

方法实现如下:

 def zipWithIndex(): RDD[(T, Long)] = withScope {
   new ZippedWithIndexRDD(this)
 }

zipWithIndex的基本原理:由于需要知道每一个partition里面的每一个元素的全局索引,首先需要计算出每一个partition的元素的个数,这样就能计算出第i个partition的第一个元素在所有全部数据里面的偏移值,接下来就简单了,由于任务是基于parition上的数据迭代的,那么parition里的数据的全局偏移就是该partition的第一个元素的偏移加上当前迭代到的元素在parition里的偏移值。

下面是ZippedWithIndexRDD中定义的一些方法:

class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {

  /** The start index of each partition. */
  @transient private val startIndices: Array[Long] = {
   //获得依赖的父rdd的partition的个数
    val n = prev.partitions.length
    if (n == 0) {
      Array.empty
    } else if (n == 1) {
      Array(0L)
    } else {
     /*这里提交了一个spark job运行来统计每一个partition的元素个数。
        1. 参数Utils.getIteratorSize是一个函数,task运行在分区上时调用,它返回分区大元素个数.
        2. 参数0 until n-1指定了运行task的分区是[0, n-1),不需要计算最后一个分区大小,
           因为最后一个分区的偏移是前面所有分区的元素个数之和。
        3. scanLeft(0L)(_ + _),runJob返回[0,n-1)的partition大小的列表,scanLeft计算出偏移。
     */
      prev.context.runJob(
        prev,
        Utils.getIteratorSize _,
        0 until n - 1 
      ).scanLeft(0L)(_ + _)
    }
  }

  override def getPartitions: Array[Partition] = {
    //根据上有partition包装新的分区ZippedWithIndexRDDPartition,新的分区携带了自己的偏移。这是一个窄依赖
    firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
  }

  override def getPreferredLocations(split: Partition): Seq[String] =
    firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)

  override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
    val parentIter = firstParent[T].iterator(split.prev, context)
    // 重要的是这个方法,迭代上游分区的数据,返回(data, data_index)
    Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
  }
}
 ---------------------------
贴一下Utils.getIteratorZipWithIndex的实现:
1. 参数iterator是上游partition的迭代器
2. startIndex是上游partition的第一个元素的全局偏移
 def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
    new Iterator[(T, Long)] {
      require(startIndex >= 0, "startIndex should be >= 0.")
 
      var index: Long = startIndex - 1L
      def hasNext: Boolean = iterator.hasNext
      // next返回数据和其index的元组。
      def next(): (T, Long) = {
        index += 1L
        (iterator.next(), index)
      }
    }
  }

2. action

这小节里列出action,和transform不同,action会触发job的提交运行。

  1. reduce
    原型如下:
def reduce(f: (T, T) => T): T = withScope {
  val cleanF = sc.clean(f)
  /* 这个函数作为runJob的第二个参数,作用于一个job里的最后一个阶段(ResultStage)每一个分区。
      这个函数干了什么: 接受一个上游parition上的迭代器,然后调用迭代器的reduceLeft, reduceLeft使用函数f来对数据做reduce。
      所以这个函数完成了ResultStage的每一个分区的reduce,不是全局的reduce
*/
  val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    } else {
      None
    }
  }
  var jobResult: Option[T] = None
   /* 这个函数作为sc.runJob的第三个参数,当reducePartition完成每一个分区的reduce之后,
      用来对每一个分区的reduce结果合并,index是分区索引,taskResult即分区计算结果。
      它干了什么:同样适用函数f来对结果做规约,完成全局的reduce。
*/
  val mergeResult = (index: Int, taskResult: Option[T]) => {
    if (taskResult.isDefined) {
      jobResult = jobResult match {
        case Some(value) => Some(f(value, taskResult.get))
        case None => taskResult
      }
    }
  }
  /* 提交job,runJob需要两个参数,reducePartition作用于每个分区之上,也就是在executor上运行;
     mergeResult运行于driver端,收集每一个分区的结果到driver端,然后对这些结果运行mergeResult,如果每一个分区产生的结果很大的话,显然reduce可能会在driver端出现OOM
  */
  sc.runJob(this, reducePartition, mergeResult)
 
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

这个和reduceByKey不一样,reduceByKey是一个transform操作,会产生一个新的RDD(与上游RDD形成ShuffleDependency),这里的reduce是一个action,会触发job的提交(上面代码中sc.runJob);此外reduceByKey要求输入数据必须是(key,value)的二元tuple,而此处的reduce则不需要。

  1. aggregate
    aggregate也是对值做聚合操作的,但是和reduce还是不同的,下面是其方法原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    //作用于每个分区中的数据,对每个分区中的数据聚合。运行于executor上
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    //运行在driver端,对收集回来的每个分区的聚合结果再一次聚合。
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    // 和reduce一样,先用aggregatePartition在每一个分区上运行聚合分区的数据,然后获取所有分区的数据,使用mergeResult在Driver端聚合,同样从在Driver端OOM的可能。
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

reduce和aggregate聚合值的区别从方法签名就可以看出,reduce聚合前后的值的类型是一样的,比如说你不能用reduce把一个int值拼成string返回。aggregate则可以把一种类型(T)的值聚合成另一种类型(U)返回。

上面aggregate方法,U是聚合后类型,T是聚合前类型; 参数zeroValue提供一个初始值,seqOp定义怎么把T聚合到U上,combOp定义怎么把多个分区聚合后的值拼起来。

下面是一个例子,把Int拼接成字符串

// RDD r1 包含1 to 10的整型
scala> r1.collect
res63: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// seq, 拼接字符串和整型
scala> def seq(s:String, i:Int):String = { s + i}
// comb,拼接分区聚合后的字符串
scala> def comb(s1:String, s2:String):String = { s1 + s2}
// 初始值zeroValue为空字符串
scala> r1.aggregate("")(seq,comb)
res64: String = 12345678910
  1. treeAggregate
    treeAggregate和aggregate功能上是一样的,但是实现细节不一样,下面treeAggregate的实现:
  def treeAggregate[U: ClassTag](zeroValue: U)(
    seqOp: (U, T) => U,
    combOp: (U, U) => U,
    depth: Int = 2): U = withScope {
  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
  if (partitions.length == 0) {
    Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
  } else {
    val cleanSeqOp = context.clean(seqOp)
    val cleanCombOp = context.clean(combOp)
    val aggregatePartition =
      (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
    var numPartitions = partiallyAggregated.partitions.length
    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
    // If creating an extra level doesn't help reduce
    // the wall-clock time, we stop tree aggregation.

    // Don't trigger TreeAggregation when it doesn't save wall-clock time
    while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
      numPartitions /= scale
      val curNumPartitions = numPartitions
      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
        (i, iter) => iter.map((i % curNumPartitions, _))
      }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
    }
    partiallyAggregated.reduce(cleanCombOp)
  }
}

前文说到aggregate的过程是:先使用seqOp在各个分区上聚合,然后将分区结果全部拿到Driver端,然后使用combOp聚合,过多的分区的数据被移到driver端可能会导致driver上OOM;treeAggregate不同之处在于,分区聚合之后不马上把结果传回driver端聚合,而是调用reduceByKey再在远端按key聚合到更小的分区,如有必要还会经过多轮的reduceByKey,不停的把值聚合到更小的分区上,最终传回driver端做最终聚合。下图可以反应出aggregate和treeAggregate的过程上的区别:

aggregate和treeAggregate
  1. fold
    和scala集合上的fold功能一样,实现原理和reduce一样,现在每一个分区上fold,然后结果传回driver在merge.

  2. take
    方法原型如下:

     def take(num: Int): Array[T]
    

    take接收一个整型参数num,返回rdd中前num个数(从第1个partition的第1个数开始的num个数).
    take的思路大概是这样的:

    1. 使用一个ArrayBuffer buf保存返回结果,buf.size就表示已经取到的结果,一开始时显然为0.
    2. 开始时将运行task的分区设成一个(也就是第一个partition0),因为不知道前num个元素会横跨多少parition,先尝试1个
    3. 运行job在分区上取前num - buf.size(也就是还需要取的个数),放到buf中。
    4. 判断buf.size有没有达到num,没有进入4. 达到就可以返回了。
    5. 按照某种比例扩大下一轮运行任务的分区个数,下一次job运行的的分区的索引为成区间[上一次任务运行最大分区索引 +1 , 上一次任务运行最大分区索引 +下一轮分区个数], 回到2继续运行。 (比如在partition0上数据不够num个,只有num1个,那么假设下一次扩大到在两个分区上运行,那么下一轮就在[partition-1,partition-2] 上取num - num1个数据)。
  3. top, takeOrdered
    这是两个方法,放在一起是因为top是基于调用takeOrdered实现的,它们的方法原型如下:

  // top返回最大的前num个数,元素排序由ord定义,ord比较x,y, 返回负数表示x<y, 0表示x==y。
   def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    //takeOrdered默认是从小到大返回的,所以此处使用ord.reverse颠倒排序
     takeOrdered(num)(ord.reverse)
   }
   //takeOrdered返回最小的的num个数,排序由ord定义
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
   if (num == 0) {
     Array.empty
   } else {
    /* mapPartitions表示在每一个分区上运行,queue相当于一个大小为num的大根堆,维持当前已经迭代(items迭代器)的最小的num个值.
       mapParititons生成的新rdd mapRDDs的每一个parititon拥有之前上游rdd
       每个parititon的最小的num个元素
   */
     val mapRDDs = mapPartitions { items =>
       // Priority keeps the largest elements, so let's reverse the ordering.
       val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
       queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
       Iterator.single(queue)
     }
     if (mapRDDs.partitions.length == 0) {
       Array.empty
     } else {
      // 可以回顾下reduce action,获取到每一个分区返回的最小的num个元素值。然后合并这些值就能得到rdd中最小的num个元素。
       mapRDDs.reduce { (queue1, queue2) =>
         queue1 ++= queue2
         queue1
       }.toArray.sorted(ord)
     }
   }
 }

上面takeOrdered使用了RDD#reduce这个方法将每个分区的最小的num个数传会driver,在driver比较获得全局最小的num个数,如果num值很大的话会造成driver OOM

  1. max,min
    获取rdd中最大和最小值
def max()(implicit ord: Ordering[T]): T = withScope {
  // 回顾reduce,接收(T,T) => T的函数,ord.max方法比较两个值,返回大的。
 // reduce现在每个partition上运行ord.max取得partition最大的值,然后将这些值返回给driver端,得到最大的值。
  this.reduce(ord.max)
}

def min()(implicit ord: Ordering[T]): T = withScope {
  this.reduce(ord.min)
}

3 附录

3.1 cogroup

3.1.1 cogroup的作用

首先cogroup是PairRDDFunctions中定义的方法,它只能作用于元素类型是(key,value)二元组型这样的rdd, cogroup可以接收多个rdd作为参数进行操作,但是为了方便,这里只假设有两个rdd: r1, r2.
r1,r2 cogroup产生新的rdd r3: r3的key包含了r1,r2的所有的key,对于key的value是一个数组,数组组的元素依次是key在r1和r2中所有的value的数组。
下面是一个例子:

//r1, r2是国家到城市的二元组
val r1 = sc.parallelize(List(("china","hefei"),("USA","chicago"),("japan","tokyo")))
val r2 = sc.parallelize(List(("china","beijing"),("USA","new york"),("china","shanghai")))
r1.cogroup(r2).collect
//输出,CompactBuffer可以理解成数组,可以看到key包含了r1,r2的所有的key,
Array((japan,(CompactBuffer(tokyo),CompactBuffer())), (USA,(CompactBuffer(chicago),CompactBuffer(new york))), (china,(CompactBuffer(hefei),CompactBuffer(beijing, shanghai))))

3.1.2 cogroup原理

cogroup会产生CoGroupedRDD,直接看他的实现吧:

//rdd即参与cogroup的所有rdd,是一个数组,所以可以有多个rdd。
//类型化参数'_ <: Product2[K, _]'表明rdd的元素必须是二元组,而且所有的rdd的key类型得是一样的.
//part默认是HashPartitioner
class CoGroupedRDD[K: ClassTag](
    @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {

  /**
   1. 这里定义了新类型,CoGroup即来源于某一个上游rdd的key的value组成的数组.
   2. CoGroupValue,二元组,第一个元素Any类型是上游rdd中value,注意上游rdd的类型是(key,value),这里是提取value出来的,
      第二个元素Int是上游rdd在在dependencies列表中的index,也就是第一个元素来源于的那个rdd。
   3. CoGroupCombiner, 数组,每个元素是一个CoGroup,也就是说第i元素
      就是key在第I个rdd中所有value组成的数组。
   */
  private type CoGroup = CompactBuffer[Any]
  private type CoGroupValue = (Any, Int)  // Int is dependency number
  private type CoGroupCombiner = Array[CoGroup]

  private var serializer: Serializer = SparkEnv.get.serializer

  /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
  def setSerializer(serializer: Serializer): CoGroupedRDD[K] = {
    this.serializer = serializer
    this
  }

  override def getDependencies: Seq[Dependency[_]] = {
    //获取依赖时,遍历上游所有rdd
    rdds.map { rdd: RDD[_] =>
      //上游用的partitioner和当前CoGroupedRDD一样,默认的HashPartitioner相同的判断标准时产生一样的分区个数,RangeParitioner复杂一点。
      // 不管如果,相同就意味着上游rdd是通过shuffle产生的,所有的元素已经按照key聚合到对应的partiton了,
      // 当前RDD和上游rdd的分区直接可以一对一依赖,不同再shuffle一次聚合key了。
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
       // 否则的话只好shuffle一次,按key聚合好
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
      }
    }
  }

  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](part.numPartitions)
    for (i <- 0 until array.length) {
      // Each CoGroupPartition will have a dependency per contributing RDD
      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
        // Assume each RDD contributed a single dependency, and get it
        dependencies(j) match {
          case s: ShuffleDependency[_, _, _] =>
            None
          case _ =>
            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
        }
      }.toArray)
    }
    array
  }

  override val partitioner: Some[Partitioner] = Some(part)

  override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
    val split = s.asInstanceOf[CoGroupPartition]
    val numRdds = dependencies.length

    /* 看看做了什么: 首先返回的是迭代器数组,包含对每一个上游rdd的迭代.
        其次迭代的元素类型是一个二元组,第一个元素类型‘Product2[K, Any]’表明它是上游rdd里的数据, 第二个元素Int则表明第一个元素所属的rdd
    */
    val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
    for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
     // 跟上有一对一依赖就简单很多了,直接取到依赖的上游parition,返回数据和上有rdd的索引就行了。
      case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
        val dependencyPartition = split.narrowDeps(depNum).get.split
        val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
        rddIterators += ((it, depNum))

      case shuffleDependency: ShuffleDependency[_, _, _] =>
       // 跟上游shuffle依赖,那么就需要有shuffle read的过程,不提细节,总之shuffle read完成之后,
       //会从上游所有rdd中收集了属于当前CoGroupedRDD的当前分区的所有元素, 
        val it = SparkEnv.get.shuffleManager
          .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
          .read()
        rddIterators += ((it, depNum))
    }
    
   /* 返回了一个类型是‘ ExternalAppendOnlyMap’的东西,它是干什么的呢,简单说, 这个map有按key聚合的作用,就像reduceBy一样。
      当你往里面插一个元素时,它会按照你定义的combine和merger函数,把相同的key的元素聚合起来
   */
    val map = createExternalMap(numRdds)
    //迭代上游数据,根据前面rddIterator的定义,此处it是上游rdd中的数据,类型应该是(key,value)的,depNum是rdd索引 
    for ((it, depNum) <- rddIterators) {
    // map要求插入的元素必须是(K,V)型的,这里的pair._1就是rdd中的key,value是CoGroupValue介绍过,所以map会按照key来聚合。
   //所以关键是map的combiner和merger的实现
      map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
    }
    context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
    //map聚合好之后,返回新的迭代器,返回InterruptibleIterator表示它可以被中途取消
    new InterruptibleIterator(context,
      map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
  }

  private def createExternalMap(numRdds: Int)
    : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
    // createCombiner用来在出现第一个元素时,将该元素转换成聚合后的元素,可能是列表之类的,什么都可以
    val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
      
      val newCombiner = Array.fill(numRdds)(new CoGroup)
     //value._2是rdd索引,value._1是rdd数据(key,value)中的value,这句表示value加到数组中。
      newCombiner(value._2) += value._1
      newCombiner
    }
    //将元素合并到聚合后的新类型元素上,还是往数组里加
    val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
      (combiner, value) => {
      combiner(value._2) += value._1
      combiner
    }
   // 将两个聚合后的新类型合并,合并两个数组
    val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
      (combiner1, combiner2) => {
        var depNum = 0
        while (depNum < numRdds) {
          combiner1(depNum) ++= combiner2(depNum)
          depNum += 1
        }
        combiner1
      }
    new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
      createCombiner, mergeValue, mergeCombiners)
  }

  override def clearDependencies() {
    super.clearDependencies()
    rdds = null
  }
}

  1. 关于shuffle read可以参考shuffle read第三节
  2. 关于ExternalAppendOnlyMap可以参考ExternalAppendOnlyMap4.3节
上一篇下一篇

猜你喜欢

热点阅读