partitionBy (通过分区器进行分区)

2019-07-30  本文已影响0人  yayooo
  1. 作用:对pairRDD进行分区操作,通过指定的分区器决定数据计算的分区,spark默认使用的分区器是HashPartitioner

如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程(即前后分区个数不一致)。

源码:

  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

练习:

// 需求:创建一个4个分区的RDD,对其重新分区

package com.atguigu

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

object Trans {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
    //构建spark上下文对象
    val sc = new SparkContext(conf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))
    val rdd1: RDD[(Int, (Int, String))] = rdd.mapPartitionsWithIndex((index, items) => {
      items.map((index, _))
    })
    rdd1.collect().foreach(println)

    println("*************")

    val rdd3: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
    val rdd4: RDD[(Int, (Int, String))] = rdd3.mapPartitionsWithIndex((index, items) => {
      items.map((index, _))
    })
    rdd4.collect().foreach(println)
    sc.stop()
  }
}


(1,(1,aaa))
(2,(2,bbb))
(3,(3,ccc))
***************
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))


自定义分区器

//将所有数据放到同一个分区·
package com.atguigu

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

object Trans {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]")
    val sc = new SparkContext(conf)


    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")))

    val rdd2: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
    val rdd3: RDD[(Int, (Int, String))] = rdd2.mapPartitionsWithIndex((index, datas) => {
      datas.map((index, _))
    })
    rdd3.collect().foreach(println)

    sc.stop()

  }
}

class MyPartitioner(num: Int) extends Partitioner{
  override def numPartitions: Int ={
    num
  }

  override def getPartition(key: Any): Int = {
    //让所有数据放到一个分区
    1
  }
}

(1,(1,aaa))
(1,(2,bbb))
(1,(3,ccc))

上一篇下一篇

猜你喜欢

热点阅读