Spark中的聚合算子

2019-10-02  本文已影响0人  天之見證

1. combineByKeyWithClassTag

最终实现上用到combineByKeyWithClassTag的算子有如下(RDD[(K, V)]):

算子\特点 备注 是否有map端合并 中间结果类型 输出
reduceByKey V (K, V)
groupByKey CompactBuffer[V] (K, Iterable[V])
combineByKey C (K, C)
foldByKey 带有初始化0值的reduceByKey V (K, V)
aggregateByKey 带有不同类型的初始化0值的foldByKey U (K, U)

1.1 combineByKeyWithClassTag的实现

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
  }
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

从以上3个实现可以看出 createCombiner, mergeValue mergeCombiners 最终都用作初始化 aggregator

  1. createCombiner
  2. mergeValue
  3. mergeCombiners

从上可以看出是否有shuffle 其实是通过 self.partitioner == Some(partitioner) 这个来判断的, 及该算子上要加的partition是否和RDD上的partition 相同

1.2 CompactBuffer

CompactBuffer 的功能类似ArrayBuffer, 在较小的数据集上有更好的的内存利用率

ArrayBuffer: 总是初始分配一个具有16个元素的数组, 当实际中的数据如果远小于16时, 会造成较大的空间浪费 (array: Array[AnyRef] = new Array[AnyRef](math.max(initialSize, 1)))

CompactBuffer: 默认只有2个元素, 如果groupBy下key对应的value较少, 则空间利用会更好

CompactBuffer 代码如下:

private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
  // First two elements
  private var element0: T = _
  private var element1: T = _

  // Number of elements, including our two in the main object
  private var curSize = 0

  // Array for extra elements
  private var otherElements: Array[T] = null

  def apply(position: Int): T = {
    if (position < 0 || position >= curSize) {
      throw new IndexOutOfBoundsException
    }
    if (position == 0) {
      element0
    } else if (position == 1) {
      element1
    } else {
      otherElements(position - 2)
    }
  }

  private def update(position: Int, value: T): Unit = {
    if (position < 0 || position >= curSize) {
      throw new IndexOutOfBoundsException
    }
    if (position == 0) {
      element0 = value
    } else if (position == 1) {
      element1 = value
    } else {
      otherElements(position - 2) = value
    }
  }

  def += (value: T): CompactBuffer[T] = {
    val newIndex = curSize
    if (newIndex == 0) {
      element0 = value
      curSize = 1
    } else if (newIndex == 1) {
      element1 = value
      curSize = 2
    } else {
      growToSize(curSize + 1)
      otherElements(newIndex - 2) = value
    }
    this
  }

  def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
    values match {
      // Optimize merging of CompactBuffers, used in cogroup and groupByKey
      case compactBuf: CompactBuffer[T] =>
        val oldSize = curSize
        // Copy the other buffer's size and elements to local variables in case it is equal to us
        val itsSize = compactBuf.curSize
        val itsElements = compactBuf.otherElements
        growToSize(curSize + itsSize)
        if (itsSize == 1) {
          this(oldSize) = compactBuf.element0
        } else if (itsSize == 2) {
          this(oldSize) = compactBuf.element0
          this(oldSize + 1) = compactBuf.element1
        } else if (itsSize > 2) {
          this(oldSize) = compactBuf.element0
          this(oldSize + 1) = compactBuf.element1
          // At this point our size is also above 2, so just copy its array directly into ours.
          // Note that since we added two elements above, the index in this.otherElements that we
          // should copy to is oldSize.
          System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
        }

      case _ =>
        values.foreach(e => this += e)
    }
    this
  }

  override def length: Int = curSize

  override def size: Int = curSize

  override def iterator: Iterator[T] = new Iterator[T] {
    private var pos = 0
    override def hasNext: Boolean = pos < curSize
    override def next(): T = {
      if (!hasNext) {
        throw new NoSuchElementException
      }
      pos += 1
      apply(pos - 1)
    }
  }

  /** Increase our size to newSize and grow the backing array if needed. */
  private def growToSize(newSize: Int): Unit = {
    // since two fields are hold in element0 and element1, an array holds newSize - 2 elements
    val newArraySize = newSize - 2
    val arrayMax = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
    if (newSize < 0 || newArraySize > arrayMax) {
      throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
    }
    val capacity = if (otherElements != null) otherElements.length else 0
    if (newArraySize > capacity) {
      var newArrayLen = 8L
      while (newArraySize > newArrayLen) {
        newArrayLen *= 2
      }
      if (newArrayLen > arrayMax) {
        newArrayLen = arrayMax
      }
      val newArray = new Array[T](newArrayLen.toInt)
      if (otherElements != null) {
        System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
      }
      otherElements = newArray
    }
    curSize = newSize
  }
}

1.3 Aggregator

Aggregator 具体实现如下, 可以看做是ExternalAppendOnlyMap操作的一个代理, ExternalAppendOnlyMap 会有spill数据的过程

case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) {

  def combineValuesByKey(
      iter: Iterator[_ <: Product2[K, V]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

  def combineCombinersByKey(
      iter: Iterator[_ <: Product2[K, C]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

  /** Update task metrics after populating the external map. */
  private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {
    Option(context).foreach { c =>
      c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
      c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
      c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
    }
  }
}

2. treeAggregate

2.1 源码解读

def treeAggregate[U: ClassTag](zeroValue: U)(
    seqOp: (U, T) => U,
    combOp: (U, U) => U,
    depth: Int = 2): U = withScope {
  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
  if (partitions.length == 0) {
    Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
  } else {
    val cleanSeqOp = context.clean(seqOp)
    val cleanCombOp = context.clean(combOp)
    val aggregatePartition =
      (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
    var numPartitions = partiallyAggregated.partitions.length
    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
    // If creating an extra level doesn't help reduce
    // the wall-clock time, we stop tree aggregation.

    // Don't trigger TreeAggregation when it doesn't save wall-clock time
    while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
      numPartitions /= scale
      val curNumPartitions = numPartitions
      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
        (i, iter) => iter.map((i % curNumPartitions, _))
      }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
    }
    val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
  }
}

从上面的代码可以大概将计算分为3块:

  1. 分区数据和合并
  2. 分区间的合并 (foldByKey将数据进行重分区)
  3. 最后数据的合并

2.2 scala的aggregate和spark的aggregate/treeAggregate

数据集: Seq(1, 2, 3), 由于 0值 的选择造成结果会有差异

代码 结果 备注
Seq(1, 2, 3).aggregate(1)((acc, e) => acc + e, (e1, e2) => e1 + e2) 7
data.repartition(1).aggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e) 8 driver端再聚合
data.repartition(1).treeAggregate(1)((e1, e2) => e1 + e2, (acc, e) => acc + e) 9 driver端会聚合2次

以上代码中的repartition 直接关系到scala版本和spark版本最终结果的差值的大小

3. 代码片段

3.1 lazy的序列化对象

def foldByKey(
    zeroValue: V,
    partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  // Serialize the zero value to a byte array so that we can get a new clone of it on each key
  val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
  val zeroArray = new Array[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)

  // When deserializing, use a lazy val to create just one instance of the serializer per task
  lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
  val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

  val cleanedFunc = self.context.clean(func)
  combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
    cleanedFunc, cleanedFunc, partitioner)
}
上一篇下一篇

猜你喜欢

热点阅读