8. GraphX中的Vertex Cut策略

2018-12-20  本文已影响0人  GongMeng

NOTE: 本文2D paritioner的例子来自@endymecy的博客

1. 几种策略

RDD自身的Partitioner的传统策略, 比如HashPartitioner不同, Graph基于Vertex进行切分, 保证不同的Parition里是不同的边, 但通一个Vertex可以被切分到多个Parition中. 所以GraphX实现了自己的Vertex Cut策略
GraphX在进行图分割时,,它通过PartitionStrategy专门定义这些策略。在PartitionStrategy中,总共定义了EdgePartition2DEdgePartition1DRandomVertexCut以及 CanonicalRandomVertexCut这四种不同的分区策略。

1 RandomVertexCut

  /**
   * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
   * random vertex cut that colocates all same-direction edges between two vertices.
   */
case object RandomVertexCut extends PartitionStrategy {
    override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
      math.abs((src, dst).hashCode()) % numParts
    }
  }

这个方法比较简单, 设计上使用source vertex和target vertex的id来做hash,这样两个顶点之间相同方向的边会分配到同一个分区。

2 CanonicalRandomVertexCut

 /**
   * Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
   * direction, resulting in a random vertex cut that colocates all edges between two vertices,
   * regardless of direction.
   */
case object CanonicalRandomVertexCut extends PartitionStrategy {
    override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
      if (src < dst) {
        math.abs((src, dst).hashCode()) % numParts
      } else {
        math.abs((dst, src).hashCode()) % numParts
      }
    }
  }

这种分割方法和前一种方法类似, 只不过把id比较小的vertex放在前面, 这样两个顶点之间所有的边都会分配到同一个分区,而不管方向如何。

3 EdgePartition1D

 /**
   * Assigns edges to partitions using only the source vertex ID, colocating edges with the same
   * source.
   */
case object EdgePartition1D extends PartitionStrategy {
    override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
      val mixingPrime: VertexId = 1125899906842597L
      (math.abs(src * mixingPrime) % numParts).toInt
    }
  }

这种方法仅仅根据source vertex 的 id来分配变, 这样同一个vertex出来的edge会被切到同一个分区, supernode问题得不到任何缓解, 仅仅适用于比较稀疏的图.

4 EdgePartition2D

 /**
   * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
   * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
   *
   * Suppose we have a graph with 12 vertices that we want to partition
   * over 9 machines.  We can use the following sparse matrix representation:
   *
   * <pre>
   *       __________________________________
   *  v0   | P0 *     | P1       | P2    *  |
   *  v1   |  ****    |  *       |          |
   *  v2   |  ******* |      **  |  ****    |
   *  v3   |  *****   |  *  *    |       *  |
   *       ----------------------------------
   *  v4   | P3 *     | P4 ***   | P5 **  * |
   *  v5   |  *  *    |  *       |          |
   *  v6   |       *  |      **  |  ****    |
   *  v7   |  * * *   |  *  *    |       *  |
   *       ----------------------------------
   *  v8   | P6   *   | P7    *  | P8  *   *|
   *  v9   |     *    |  *    *  |          |
   *  v10  |       *  |      **  |  *  *    |
   *  v11  | * <-E    |  ***     |       ** |
   *       ----------------------------------
   * </pre>
   *
   * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
   * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
   * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
   * P6)` or the last
   * row of blocks `(P6, P7, P8)`.  As a consequence we can guarantee that `v11` will need to be
   * replicated to at most `2 * sqrt(numParts)` machines.
   *
   * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
   * balance.  To improve balance we first multiply each vertex id by a large prime to shuffle the
   * vertex locations.
   *
   * When the number of partitions requested is not a perfect square we use a slightly different
   * method where the last column can have a different number of rows than the others while still
   * maintaining the same size per block.
   */
case object EdgePartition2D extends PartitionStrategy {
    override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
      val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
      val mixingPrime: VertexId = 1125899906842597L
      if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
        // Use old method for perfect squared to ensure we get same results
        val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
        val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
        (col * ceilSqrtNumParts + row) % numParts
      } else {
        // Otherwise use new method
        val cols = ceilSqrtNumParts
        val rows = (numParts + cols - 1) / cols
        val lastColRows = numParts - rows * (cols - 1)
        val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
        val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
        col * rows + row
      }
    }
  }

这种方法同时使用到了source vertex和target vertex的id, 它把整个图看成一个稀疏的矩阵, 然后对这个矩阵进行切分. 从而保证顶点的备份数不大于2 * sqrt(numParts)的限制。这里numParts表示分区数。
这个方法的实现分两种情况,即分区数能完全开方和不能完全开方两种情况。当分区数能完全开方时,采用下面的方法:

 val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
 val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
 (col * ceilSqrtNumParts + row) % numParts

当分区数不能完全开方时,采用下面的方法。这个方法的最后一列允许拥有不同的行数。

val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
//最后一列允许不同的行数
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row

下面举个例子来说明该方法。假设我们有一个拥有12个顶点的图,要把它切分到9台机器。我们可以用下面的稀疏矩阵来表示:

          __________________________________
     v0   | P0 *     | P1       | P2    *  |
     v1   |  ****    |  *       |          |
     v2   |  ******* |      **  |  ****    |
     v3   |  *****   |  *  *    |       *  |
          ----------------------------------
     v4   | P3 *     | P4 ***   | P5 **  * |
     v5   |  *  *    |  *       |          |
     v6   |       *  |      **  |  ****    |
     v7   |  * * *   |  *  *    |       *  |
          ----------------------------------
     v8   | P6   *   | P7    *  | P8  *   *|
     v9   |     *    |  *    *  |          |
     v10  |       *  |      **  |  *  *    |
     v11  | * <-E    |  ***     |       ** |
          ----------------------------------

上面的例子中*表示分配到处理器上的边。E表示连接顶点v11v1的边,它被分配到了处理器P6上。为了获得边所在的处理器,我们将矩阵切分为sqrt(numParts) * sqrt(numParts)块。
注意,上图中与顶点v11相连接的边只出现在第一列的块(P0,P3,P6)或者最后一行的块(P6,P7,P8)中,这保证了V11的副本数不会超过2 * sqrt(numParts)份,在上例中即副本不能超过6份。

在上面的例子中,P0里面存在很多边,这会造成工作的不均衡。为了提高均衡,我们首先用顶点id乘以一个大的素数,然后再shuffle顶点的位置。乘以一个大的素数本质上不能解决不平衡的问题,只是减少了不平衡的情况发生。

上一篇 下一篇

猜你喜欢

热点阅读