hadoop 大数据底层理解

Spark k-v类型转换算子

2022-02-08  本文已影响0人  Tim在路上

Spark k-v类型转换算子

MapPartitionsRDD

将传入的函数应用于value的算子,实质是创建了MapPartitionsRDD,并在调用迭代函数时,只将函数应用于value。

def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
  val cleanF = self.context.clean(f)
  new MapPartitionsRDD[(K, U), (K, V)](self,
    (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
    preservesPartitioning = true)
}

从上面源码可以看出返回的仍然是key-val类型,但仅仅将函数应用于v。(k, cleanF(v))

将键值对的value进行压平,并再进行map映射为k-v。实质还是调用了MapPartitionsRDD。

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
  val cleanF = self.context.clean(f)
  new MapPartitionsRDD[(K, U), (K, V)](self,
    (context, pid, iter) => iter.flatMap { case (k, v) =>
      cleanF(v).map(x => (k, x))
    },
    preservesPartitioning = true)
}

可以发现在应用传入函数cleanF(v).map(x => (k, x))又对其进行散列。

ShuffledRDD

partitionBy 算子的功能和reparition的功能差不多,都是返回指定分区个数的分区。partitionBy 是针对key-val RDD的,在传入参数中可以传入使用的分区器。

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  if (self.partitioner== Some(partitioner)) {
    self
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}

分区中数组不能作为Hash分区器的key。其次判断传入的分区器和当前的分区器是否相等,相等则不会进行操作,直接返回。注意:分区器的相等,如果是HashPartitioner必须分区数也一致。

override def equals(other: Any): Boolean = other match {
  case h: HashPartitioner =>
    h.numPartitions == numPartitions
  case _ =>
    false
}

所以如果分区数是不变的那么直接返回,否则一定会进行ShuffledRDD。只有k-v会发生shuffle,这也是方便k-v修改shuffle时的分区器。

combineByKey 算子按照key将value进行聚合,它是combineByKeyWithClassTag算子的简化版本,使用的是HashPartitioner分区器。

def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
    partitioner, mapSideCombine, serializer)(null)
}

需要传入三个函数:

createCombiner:组合器函数,定义了v如何转换为c。

mergeValue:合并值函数,定义了如何给定一个V将其与原来的C合并成新的C。

mergeCombiners:合并组合器函数,定义了如何将相同key下的C给合并成一个C。

举个例子:

val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x + x))
val createCombiner = (v: Int) =>List(v)
val mergeValue: (List[Int], Int) => List[Int] = (c: List[Int], v: Int) => v :: c
val mergeCombiners = (c1: List[Int], c2: List[Int]) => c1 ::: c2
val rdd4 = rdd2.combineByKey(createCombiner, mergeValue, mergeCombiners, 3)
println(rdd4.collect().mkString(","))
(6,List(12, 12)),(4,List(8, 8)),(1,List(2)),(5,List(10))

源码是使用了aggregation 函数,将RDD[(k,v)]类型转换为RDD[(k,c)], 将v聚合为c。其次在传参中用户可以通过mapSideCombine参数,来设置是否开启map端的聚合。

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
// 合并组合器函数必须定义
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  // key为数组,不能map端聚合
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    // hash分区器,key不能为数组
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
  }
 // 创建一个Aggregator
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner==Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    // 创建shuffleRDD
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

shuffledRDD顾明思意,会使用分区器将原数据进行打乱,并重新分配到新的分区中。下面我们将详细介绍shuffledRDD的实现过程:

  1. 获取分区数组
override def getPartitions: Array[Partition] = {
  Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}

生成一个分区size大小的分区数组,依次为每一个分区创建一个ShuffledRDDPartition。ShuffledRDDPartition仅仅保存了当前的分区id。

private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
  override valindex: Int = idx
}
  1. 获取RDD的依赖,并向ShuffleManage注册shuffle
override def getDependencies: Seq[Dependency[_]] = {
  val serializer =userSpecifiedSerializer.getOrElse {
    val serializerManager = SparkEnv.get.serializerManager
    if (mapSideCombine) {
      serializerManager.getSerializer(implicitly[ClassTag[K]],implicitly[ClassTag[C]])
    } else {
      serializerManager.getSerializer(implicitly[ClassTag[K]],implicitly[ClassTag[V]])
    }
  }
List(new ShuffleDependency(prev, part, serializer,keyOrdering,aggregator,mapSideCombine))
}

获得序列化器,并创建ShuffleDependency。在ShuffleDependency中会校验如果开启mapSideCombine,则要求Aggregator函数必须指定。其次,会获取keyClass, valueClass, combinerClass。以及生成并向shuffleManager注册shuffleId。

valshuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)

在进行注册shuffle时会根据不同的情况判断使用那种shuffle。这里就不重点介绍,之后会详细介绍shuffle的细节实现。

override def registerShuffle[K, V, C](
    shuffleId: Int,
    numMaps: Int,
    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
    // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
    // need map-side aggregation, then write numPartitions files directly and just concatenate
    // them at the end. This avoids doing serialization and deserialization twice to merge
    // together the spilled files, which would happen with the normal code path. The downside is
    // having multiple files open at a time and thus more memory allocated to buffers.
    new BypassMergeSortShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
    // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
    new SerializedShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else {
    // Otherwise, buffer map outputs in a deserialized form:
    new BaseShuffleHandle(shuffleId, numMaps, dependency)
  }
}
  1. 执行上一个Stage的ShuffleMapTask

ShuffleRDD属于宽依赖,Spark会依据宽依赖将tasks划分为不同的阶段,只有上一个Stage执行完成才会涉及到下一个Stage的执行,所以这里的数据传递,必须涉及执行的过程。

Spark中的Task可以分为ResultTask和ShuffleMapTask两种,ShuffleMapTask一般是向下一级Stage传递数据,ResultTask是将数据的结果直接返回的Task。ShuffleRDD使用到的就是ShuffleMapTask。

ShuffleMapTask的功能就是切分RDD的元素,将其分配到多个桶中。这里的桶指的就是根据上面获取分区数组,分配的方法是采用RDD相应的分区器进行实现。

override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime= System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime= if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch {
      ...
      throw e
  }
}

在ShuffleMapTask中,先对RDD和ShuffleDependency进行了序列化,然后通过SparkEnv获得ShuffleManage, 调用其write方法,进行Map端的写出。

在进行getWriter()时会根据在获取依赖时注册的shuffle方式获取其对应的Writer方法。

override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Int,
    context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
    handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
  val env = SparkEnv.get
handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(
        env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        context.taskMemoryManager(),
        unsafeShuffleHandle,
        mapId,
        context,
        env.conf)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter(
        env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        bypassMergeSortHandle,
        mapId,
        context,
        env.conf)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
  }
}

然后,再将上一个Stage的数据在Map端进行写出,写出的策略是按照选择的shuffle方式决定的。

  1. 获取优先位置信息
override protected def getPreferredLocations(partition: Partition): Seq[String] = {
  val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  tracker.getPreferredLocationsForShuffle(dep, partition.index)
}

从master获取MapOutputTracker信息,获取shuffleDependency中的依赖信息。并将dep和分区id传入tracker获取优先位置信息。

def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
    : Seq[String] = {
  if (shuffleLocalityEnabled&& dep.rdd.partitions.length <SHUFFLE_PREF_MAP_THRESHOLD&&
      dep.partitioner.numPartitions <SHUFFLE_PREF_REDUCE_THRESHOLD) {
    val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
      dep.partitioner.numPartitions,REDUCER_PREF_LOCS_FRACTION)
    if (blockManagerIds.nonEmpty) {
      blockManagerIds.get.map(_.host)
    } else {
Nil
}
  } else {
Nil
}
}

从中可以看出,只有开启本地reduce,同时map端的分区数小于默认1000,reduce端的分区数小于默认1000,才可以从tracker中获取优先位置。

  1. 执行compute
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
    .read()
    .asInstanceOf[Iterator[(K, C)]]
}

从其计算函数可以发现,ShuffleRDD会先将deps的head转换为ShuffleDependency。然后通过SparkEnv获取ShuffleManager的getReader从Map端进行读取写出的数据。getReader也会根据注册的Shuffle方式返回相应方式的reader策略。

最后调用read()方法进行读取。

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
// 创建 ShuffleBlockFetcher 迭代器,传入blockManager, mapSize, reduceSize
  val wrappedStreams = new ShuffleBlockFetcherIterator(
    context,
    blockManager.shuffleClient,
    blockManager,
    mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
    serializerManager.wrapStream,
    // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
    SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
    SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
    SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
    SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
    SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

  val serializerInstance =dep.serializer.newInstance()
// 创建key/value迭代器
  // Create a key/value iterator for each stream
  val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
    // Note: the asKeyValueIterator below wraps a key/value iterator inside of a
    // NextIterator. The NextIterator makes sure that close() is called on the
    // underlying InputStream when all records have been read.
    serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
  }
// 更新task metrics
  // Update the context task metrics for each record read.
  val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
  val metricIter =CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
    recordIter.map { record =>
      readMetrics.incRecordsRead(1)
      record
    },
    context.taskMetrics().mergeShuffleReadMetrics())

  // An interruptible iterator must be used here in order to support task cancellation
  val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)

 // 如果定义aggregator,同时开启mapSideCombine, 调用其combineCombinersByKey
  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
    if (dep.mapSideCombine) {
      // We are reading values that are already combined
      val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
    } else {
      // We don't know the value type, but also don't care -- the dependency *should*
      // have made sure its compatible w/ this aggregator, which will convert the value
      // type to the combined type C
      val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
    }
  } else {
    interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
  }

// 按key分区排序,并spill到内存,磁盘
  // Sort the output if there is a sort ordering defined.
  val resultIter =dep.keyOrdering match {
    case Some(keyOrd: Ordering[K]) =>
      // Create an ExternalSorter to sort the data.
      val sorter =
        new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer =dep.serializer)
      sorter.insertAll(aggregatedIter)
      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
      context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
      // Use completion callback to stop sorter if task was finished/cancelled.
      context.addTaskCompletionListener[Unit](_ => {
        sorter.stop()
      })
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
    case None =>
      aggregatedIter
  }

  resultIter match {
    case _: InterruptibleIterator[Product2[K, C]] => resultIter
    case _ =>
      // Use another interruptible iterator here to support task cancellation as aggregator
      // or(and) sorter may have consumed previous interruptible iterator.
      new InterruptibleIterator[Product2[K, C]](context, resultIter)
  }
}

foldByKey算子主要用于合并values的值,在合并前会为每一个value加上一个初值zeroValue。

val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x))
val rdd4 = rdd2.foldByKey(10)((a, b) => a + b)
println(rdd4.collect().mkString(","))
(1,11),(4,28),(5,15),(6,32)

可以看出其是先将值与zerovalue进行合并后,在调用传入的func进行合并。

def foldByKey(
    zeroValue: V,
    partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  // Serialize the zero value to a byte array so that we can get a new clone of it on each key
  val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
  val zeroArray = new Array[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)

  // When deserializing, use a lazy val to create just one instance of the serializer per task
  lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
  val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

  val cleanedFunc = self.context.clean(func)
  combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
    cleanedFunc, cleanedFunc, partitioner)
}

首先将zeroValue值使用序列化器转换为byte array, 这样可以方便给每一个key进行copy一份。创建一个从缓存反序列化获取zeroValue的函数。clean 传入的value合并函数。最后再调用combineByKeyWithClassTag,并将构造的函数传入。后面的内容就和combineByKey一致。foldByKey是开启map端合并

reduceByKey是不带初值的values的合并,底层调用的同样是combineByKeyWithClassTag算子。

//val rdd4 = rdd2.reduceByKey((a, b) => a + b)
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

reduceByKey和foldByKey的唯一区别是是否带初值。

就是字面意思,对键值对RDD进行按Key分组,并将value加入维护的Seq中。并不会保证分组的顺序。采用的分区器为默认的HashPartitioner。

val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x))
val rdd4 = rdd2.groupByKey()
println(rdd4.collect().mkString(","))
(1,CompactBuffer(1)),(4,CompactBuffer(4, 4)),(5,CompactBuffer(5)),(6,CompactBuffer(6, 6))

可以看到聚合的values,被封装入了CompactBuffer类型中。

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  // groupByKey shouldn't use map side combine because map side combine does not
  // reduce the amount of data shuffled and requires all map side data be inserted
  // into a hash table, leading to more objects in the old gen.
  val createCombiner = (v: V) =>CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

从源码看出groupByKey其实是上面reduceByKey的缩减版,不用用户创建聚合的函数。

下面我们来看下CompactBuffer这个数据类型:

private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
  // First two elements
  private var element0: T = _
  private var element1: T = _

  // Number of elements, including our two in the main object
  private varcurSize= 0

  // Array for extra elements
  private varotherElements: Array[T] = null

  ...
}

从CompactBuffer的实现上可以看出,CompactBuffer和ArrayBuffer的实现是基本一样的,不同的地方是CompactBuffer会一直维护element0,element1,其他放入对象数组,而ArrayBuffer是将所有元素都放入对象数组中。其次ArrayBuffer在创建时默认分配16元素空间。总之,CompactBuffer是ArrayBuffer的简化版,更节省内存空间,场景上是考虑了在groupby时经常会有很多的key,其values是很小的,并不需要创建很大的空间。

groupBy 和 groupByKey的区别是,groupByKey是按照key进行分组,但是groupBy是根据用户传入的函数,将元素的值进行转换作为key, 按照应用函数后的值作为key进行分组,分组的结果为(k,v)都作为value。groupBy是RDD类的函数,它即可以作为RDD使用,也可以作为PairRDD使用。

val rdd1 = spark.sparkContext.parallelize(Seq(1, 5, 4, 6, 4, 6))
val rdd2 = rdd1.map(x => (x, x))
val rdd4 = rdd2.groupBy(c => c._1 % 5)
println(rdd4.collect().mkString(","))
(0,CompactBuffer((5,5))),(1,CompactBuffer((1,1), (6,6), (6,6))),(4,CompactBuffer((4,4), (4,4)))

上面的例子中,将key模5作为key进行分组,看下源码是如何实现的。

def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
    : RDD[(K, Iterable[T])] = withScope {
  val cleanF = sc.clean(f)
  this.map(t => (cleanF(t), t)).groupByKey(p)
}

源码中可以看出,将(cleanF(t), t)将元素应用于函数作为key, 将整个元素作为value, 经过map转换为键值对类型,再调用groupByKey(p)。

综上,combineByKey、foldByKey、reduceByKey 和 groupByKey,它们都是对一个RDD的操作,同时它们底层调用的都是combineByKeyWithClassTag,他们仅仅是依次的简化版。


以下为多个RDD的操作算子:

CoGroupedRDD

cogroup是将this和other的RDD中的数据进行分组合并,但和groupByKey不同的是,其不会将values合并到同一个迭代器中,仅仅是迭代器的合并。

var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
  ("a", 8), ("a", 9)), 2)
val rdd3 = rdd1.cogroup(rdd2)
println(rdd3.collect().mkString(","))

(a,(CompactBuffer(1),CompactBuffer(2, 5, 6, 8, 9))),(b,(CompactBuffer(5, 6),CompactBuffer())),(c,(CompactBuffer(6),CompactBuffer())),(d,(CompactBuffer(4),CompactBuffer()))

从源码可以看出,cogroup算子直接创建了一个CoGroupedRDD,在进行cogroup时,如果分区器为HashPartitioner, key不能为数组。

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
  cg.mapValues { caseArray(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  }
}
  1. 获取RDD分区

override def getPartitions: Array[Partition] = {
  val array = new Array[Partition](part.numPartitions)
  for (i <- 0 until array.length) {
    // Each CoGroupPartition will have a dependency per contributing RDD
    array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
      // Assume each RDD contributed a single dependency, and get it
      dependencies(j) match {
        case s: ShuffleDependency[_, _, _] =>
          None
        case _ =>
Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
      }
    }.toArray)
  }
  array
}

创建CoGroupPartition ,每个RDD的检测其依赖,如果为ShuffleDependency返回为空,否则返回NarrowCoGroupSplitDep。

  1. 获得依赖
override def getDependencies: Seq[Dependency[_]] = {
  rdds.map { rdd: RDD[_] =>
    if (rdd.partitioner==Some(part)) {
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[K, Any, CoGroupCombiner](
        rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part,serializer)
    }
  }
}

如果rdd的分区器是一致的则,使用 OneToOneDependency依赖,否则ShuffleDependency。override val partitioner: Some[Partitioner] = Some(part) ,而CoGroupRDD的分区器就是传入的分区器。遍历所有RDD的分区器,如果和传入的分区器一致则为OneToOne依赖,否则为ShuffleDependency依赖。

  1. 执行compute
override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
  val split = s.asInstanceOf[CoGroupPartition]
  val numRdds = dependencies.length

  // A list of (rdd iterator, dependency number) pairs
  val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
  for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
    case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
      val dependencyPartition = split.narrowDeps(depNum).get.split
      // Read them from the parent
      val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
      rddIterators += ((it, depNum))

    case shuffleDependency: ShuffleDependency[_, _, _] =>
      // Read map outputs of shuffle
      val it = SparkEnv.get.shuffleManager
        .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
        .read()
      rddIterators += ((it, depNum))
  }
  // 创建外部map
  val map = createExternalMap(numRdds)
  for ((it, depNum) <- rddIterators) {
    map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
  }
  context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
  context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
  context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
  new InterruptibleIterator(context,
    map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}

从源码可以看出,cogroup将依赖分区两种,分别进行封装到RDD的迭代器中,然后创建外部Map, 其中会创建一个ExternalAppendOnlyMap,它是Spark定义的一个优化内存使用的仅支持append的Map, 如果内存不足会将数据spill到磁盘。其中提供map合并的函数。最后遍历RDD的分区集合,将其进行合并返回。

下面我们来看下外部Map是如何实现的:

private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int)  // Int is dependency number
private type CoGroupCombiner = Array[CoGroup]
private def createExternalMap(numRdds: Int)
  : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {

  val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
    val newCombiner = Array.fill(numRdds)(new CoGroup)
    newCombiner(value._2) += value._1
    newCombiner
  }
  val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
    (combiner, value) => {
    combiner(value._2) += value._1
    combiner
  }
  val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
    (combiner1, combiner2) => {
      var depNum = 0
      while (depNum < numRdds) {
        combiner1(depNum) ++= combiner2(depNum)
        depNum += 1
      }
      combiner1
    }
  new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
    createCombiner, mergeValue, mergeCombiners)
}

可以看出和Aggregate实现类似,只是给每一个RDD分配了CompactBuffer类型(优化的只append 的ArrayBuffer类型)。

ExternalAppendOnlyMap 是extends Spillable类,在value合并到组合器时会根据需要进行溢出。默认为32k。

join算子是将多个RDD按key进行聚合后,然后在进行flatMap展平,返回key匹配后value形成的(k,v)对。

var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
  ("a", 8), ("a", 9)), 2)
val rdd3 = rdd1.join(rdd2)
println(rdd3.collect().mkString(","))
(a,(1,2)),(a,(1,5)),(a,(1,6)),(a,(1,8)),(a,(1,9))

join的实现上,实际是调用了cogroup算子,然后将返回值调用flatMapValues算子。其次,从源码可以看出join算子,只允许两个RDD。

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  )
}

从源码使用yield将返回封装为集合,只有两个RDD都存在相同key才会返回。

和Join算子类似,调用cogroup算子,返回左RDD的所有,如果右为空则返回None。

def leftOuterJoin[W](
    other: RDD[(K, W)],
    partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues { pair =>
    if (pair._2.isEmpty) {
      pair._1.iterator.map(v => (v, None))
    } else {
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
    }
  }
}

右外连接和左外连接时类似的,只是将左为空的返回None。

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Option[V], W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues { pair =>
    if (pair._1.isEmpty) {
      pair._2.iterator.map(w => (None, w))
    } else {
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
    }
  }
}

SubtractedRDD

返回RDD中数据,在this中,不在other中。SubtractedRDD在RDD转换算子提过,其实质是CoGroupRDD的优化版。

var rdd1 = spark.sparkContext.parallelize(Seq(("a", 1), ("b", 5), ("d", 4), ("c", 6), ("b", 6)))
var rdd2 = spark.sparkContext.parallelize(Seq(("a", 2), ("a", 5), ("a", 6),
  ("a", 8), ("a", 9)), 2)
val rdd3 = rdd1.subtractByKey(rdd2)
println(rdd3.collect().mkString(","))
(b,5),(b,6),(c,6),(d,4)

可以看到返回的数据是,仅仅在RDD1中的,所以可以直接将RDD1加入内存,RDD2使用Stream读进行匹配。

  1. 获取分区数组
override def getPartitions: Array[Partition] = {
  val array = new Array[Partition](part.numPartitions)
  for (i <- 0 until array.length) {
    // Each CoGroupPartition will depend on rdd1 and rdd2
    array(i) = new CoGroupPartition(i,Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
      dependencies(j) match {
        case s: ShuffleDependency[_, _, _] =>
          None
        case _ =>
          Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
      }
    }.toArray)
  }
  array
}
  1. 获取依赖
override def getDependencies: Seq[Dependency[_]] = {
  def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
    : Dependency[_] = {
    if (rdd.partitioner== Some(part)) {
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[T1, T2, Any](rdd, part)
    }
  }
Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
}

从代码可以看出,生成分区数组和获取依赖,完全和CoGroupRDD一模一样,连创建的分区也是一致的为CoGroupPartition。

  1. 执行compute
override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
    val partition = p.asInstanceOf[CoGroupPartition]
    val map = new JHashMap[K, ArrayBuffer[V]]
    def getSeq(k: K): ArrayBuffer[V] = {
      val seq = map.get(k)
      if (seq != null) {
        seq
      } else {
        val seq = new ArrayBuffer[V]()
        map.put(k, seq)
        seq
      }
    }
    def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
      dependencies(depNum) match {
        case oneToOneDependency: OneToOneDependency[_] =>
          val dependencyPartition = partition.narrowDeps(depNum).get.split
          oneToOneDependency.rdd.iterator(dependencyPartition, context)
            .asInstanceOf[Iterator[Product2[K, V]]].foreach(op)

        case shuffleDependency: ShuffleDependency[_, _, _] =>
          val iter = SparkEnv.get.shuffleManager
            .getReader(
              shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
            .read()
          iter.foreach(op)
      }
    }
    // 将RDD1加载map中
    // the first dep is rdd1; add all values to the map
    integrate(0, t => getSeq(t._1) += t._2)
    // the second dep is rdd2; remove all of its keys
    // 使用RDD2的值,从map中移除
    integrate(1, t => map.remove(t._1))
    map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
  }

综上,SubtractedRDD是CoGroupRDD的优化版。

上一篇下一篇

猜你喜欢

热点阅读