黑猴子的家:Spark RDD 转换算子 Transformat
常用的RDD 转换算子Transformation
1、Map (一对一)
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
scala> val rddmap = sc.makeRDD(List((1,(1,2)),(2,(2,3)),(3,(3,4)),(4,(6,7))),4)
rddmap: org.apache.spark.rdd.RDD[(Int, (Int, Int))]
= ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rddmap.map((a) => (a._2._2,(a._2._1),a._1)).collect
res0: Array[(Int, Int, Int)] = Array((2,1,1), (3,2,2), (4,3,3), (7,6,4))
scala> rddmap.map{ case (a,(b,c)) => (c,(b,a)) }
res1: org.apache.spark.rdd.RDD[(Int, (Int, Int))]
= MapPartitionsRDD[2] at map at <console>:27
scala> rddmap.map{ case (a,(b,c)) => (c,(b,a)) }.collect
res2: Array[(Int, (Int, Int))] = Array((2,(1,1)), (3,(2,2)), (4,(3,3)), (7,(6,4)))
scala> val rddmap2 = sc.parallelize(1 to 10)
rddmap2: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rddmap2.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val rddmapt = rddmap2.map(_ * 2)
rddmapt: org.apache.spark.rdd.RDD[Int]
= MapPartitionsRDD[5] at map at <console>:26
scala> rddmapt.collect
res4: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
2、filter 过滤
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
scala> val rddList = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
rddList: org.apache.spark.rdd.RDD[(Int, (Int, Int))]
= ParallelCollectionRDD[6] at makeRDD at <console>:24
scala> rddList.filter{ case (a,(b,c)) => c%2 != 0 }.collect
res9: Array[(Int, (Int, Int))] = Array((2,(22,23)))
scala> rddList.collect{ case (a,(b,c)) if c%2 !=0 => (a,(b,c)) }.collect
res10: Array[(Int, (Int, Int))] = Array((2,(22,23)))
scala> val rddfilter = sc.parallelize(Array("xiaoming","xiaohong","xiaohe","hadoop"))
rddfilter: org.apache.spark.rdd.RDD[String]
= ParallelCollectionRDD[12] at parallelize at <console>:24
scala> val rddf = rddfilter.filter(_ contains "xiao")
rddf: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:26
scala> rddfilter.collect
res11: Array[String] = Array(xiaoming, xiaohong, xiaohe, hadoop)
scala> rddf.collect
res12 Array[String] = Array(xiaoming, xiaohong, xiaohe)
3、flatMap 压扁压平
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
//flatmap 压扁压平, 一对多
scala> val rddflatmap = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
rddflatmap: org.apache.spark.rdd.RDD[(Int, (Int, Int))]
= ParallelCollectionRDD[20] at makeRDD at <console>:24
scala> rddflatmap.flatMap{ case (a,(b,c)) => List(a,b,c) }.collect
res16: Array[Int] = Array(1, 11, 12, 2, 22, 23, 3, 33, 34, 4, 46, 48)
scala> val sourcefm = sc.makeRDD(1 to 5)
sourcefm: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[22] at makeRDD at <console>:24
scala> sourcefm.collect
res17: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val flatmaprdd = sourcefm.flatMap(x => (1 to x))
flatmaprdd: org.apache.spark.rdd.RDD[Int]
= MapPartitionsRDD[23] at flatMap at <console>:26
scala> flatmaprdd.collect
res18: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
对于每一个分区的数据进行转换,注意函数形式,每一个分区运行一次函数。类似于map,但独立在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
mapPartitions 是每个分区运行,相对来说,当面对大数据集的时候,比 map 速度快很多
scala> val smprdd = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
smprdd: org.apache.spark.rdd.RDD[(Int, (Int, Int))]
= ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> smprdd.mapPartitions( items => items.map{ case (a,(b,c)) => (c,(b,a)) } ).collect
res0: Array[(Int, (Int, Int))]
= Array((12,(11,1)), (23,(22,2)), (34,(33,3)), (48,(46,4)))
scala> val smprdd2 = sc.parallelize(List( \
("kpop","female"),("zorro","male"), \
smprdd2: org.apache.spark.rdd.RDD[(String, String)]
= ParallelCollectionRDD[2] at parallelize at <console>:24
scala> :paste
// Entering paste mode (ctrl-D to finish)
def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = {
var woman = List[String]()
while (iter.hasNext){
val next = iter.next()
next match {
case (_,"female") => woman = next._1 :: woman
case _ =>
// Exiting paste mode, now interpreting. //Ctrl+D 退出
partitionsFun: (iter: Iterator[(String, String)])Iterator[String]
scala> val result = smprdd2.mapPartitions(partitionsFun)
result: org.apache.spark.rdd.RDD[String]
= MapPartitionsRDD[3] at mapPartitions at <console>:28
scala> result.collect
res1: Array[String] = Array(kpop, lucy)
5、mapPartitionsWithIndex (分区+分区索引)
对于每一个分区的数据进行转换,注意函数形式,多了一个分区的索引,每一个分区运行一次函数,类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
scala> val rdd = sc.makeRDD(List((1,(11,12)),(2,(22,23)),(3,(33,34)),(4,(46,48))),4)
rdd: org.apache.spark.rdd.RDD[(Int, (Int, Int))]
= ParallelCollectionRDD[4] at makeRDD at <console>:24
scala> rdd.repartition(2)
res2: org.apache.spark.rdd.RDD[(Int, (Int, Int))]
= MapPartitionsRDD[8] at repartition at <console>:27
scala> rdd.partitioner
res3: Option[org.apache.spark.Partitioner] = None
scala> rdd.mapPartitionsWithIndex( \
(a,items) => Iterator(a+":" +items.map( \
t => (" "+ t._1 + "("+t._2._1 +","+t._2._2+"))")).mkString("|") ) ).collect
res5: Array[String] = Array(0: 1(11,12)), 1: 2(22,23)), 2: 3(33,34)), 3: 4(46,48)))
scala> val rdd = sc.parallelize(List( \
rdd: org.apache.spark.rdd.RDD[(String, String)]
= ParallelCollectionRDD[12] at parallelize at <console>:24
scala> :paste
// Entering paste mode (ctrl-D to finish)
def partitionsFun(index : Int, iter : Iterator[(String,String)]) : Iterator[String] = {
var woman = List[String]()
while (iter.hasNext){
val next = iter.next()
next match {
case (_,"female") => woman = "["+index+"]"+next._1 :: woman
case _ =>
// Exiting paste mode, now interpreting.
partitionsFun: (index: Int, iter: Iterator[(String, String)])Iterator[String]
scala> val result = rdd.mapPartitionsWithIndex(partitionsFun)
result: org.apache.spark.rdd.RDD[String]
= MapPartitionsRDD[13] at mapPartitionsWithIndex at <console>:28
scala> result.collect
res7: Array[String] = Array([0]kpop, [1]lucy)
6、sample (随机抽样)
对RDD数据进行抽样,返回一个RDD,以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值)
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")
withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[14] at parallelize at <console>:24
scala> rdd.collect()
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> var sample1 = rdd.sample(true,0.4,2)
sample1: org.apache.spark.rdd.RDD[Int]
= PartitionwiseSampledRDD[15] at sample at <console>:26
scala> sample1.collect()
res9: Array[Int] = Array(1, 2, 2)
scala> var sample2 = rdd.sample(false,0.2,3)
sample2: org.apache.spark.rdd.RDD[Int]
= PartitionwiseSampledRDD[16] at sample at <console>:26
scala> sample2.collect()
res11: Array[Int] = Array(1, 9)
scala> val smrdd = sc.makeRDD(0 to 100)
smrdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[17] at makeRDD at <console>:24
scala> smrdd.sample(false,0.3,2).collect
res12: Array[Int] = Array(0, 2, 6, 7, 9, 10, 11, 15,
16, 17, 18, 23, 24, 27, 28, 31, 32, 45, 49, 59,
63, 64, 68, 69, 73, 75, 76, 79, 80, 83, 86,
89, 90, 94, 97)
scala> smrdd.collect
res13: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26,
27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41,
42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86,
87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
7、union (合并)
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
scala> val ardd = sc.makeRDD(0 to 10)
ardd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[19] at makeRDD at <console>:24
scala> val brdd = sc.makeRDD(11 to 20)
brdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[20] at makeRDD at <console>:24
scala> (ardd union brdd) collect
warning: there was one feature warning; re-run with -feature for details
res14: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
8、intersection (求交集)
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter {case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty}
scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = sc.makeRDD(4 to 10)
rdd2: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> val rdd3 = rdd1 intersection rdd2
rdd3: org.apache.spark.rdd.RDD[Int]
= MapPartitionsRDD[7] at intersection at <console>:28
scala> rdd3 collect
warning: there was one feature warning; re-run with -feature for details
res0: Array[Int] = Array(4, 6, 7, 5)
9、distinct (数据去重)
对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
scala> val disrdd = sc.parallelize(List(1,1,2,3,3,4,5,5,6,7,8))
disrdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[8] at parallelize at <console>:24
scala> val urdd = disrdd.distinct
urdd: org.apache.spark.rdd.RDD[Int]
= MapPartitionsRDD[14] at distinct at <console>:26
scala> urdd.collect()
res1: Array[Int] = Array(4, 6, 8, 2, 1, 3, 7, 5)
scala> val urdd2 = disrdd distinct 5
urdd2: org.apache.spark.rdd.RDD[Int]
= MapPartitionsRDD[20] at distinct at <console>:26
scala> urdd2.collect
res3: Array[Int] = Array(5, 1, 6, 7, 2, 3, 8, 4)
10、partitionBy (重新分区)
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
if (self.partitioner == Some(partitioner)) {
} else {
new ShuffledRDD[K, V, V](self, partitioner)
scala> val prdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
prdd: org.apache.spark.rdd.RDD[(Int, String)]
= ParallelCollectionRDD[21] at parallelize at <console>:24
scala> prdd.partitions.size
res4: Int = 4
scala> import org.apache.spark._
import org.apache.spark._
scala> var rdd2 = prdd.partitionBy(new HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)]
= ShuffledRDD[22] at partitionBy at <console>:29
scala> rdd2.partitions.size
res5: Int = 2
scala> val a = sc.makeRDD(1 to 100,5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at makeRDD at <console>:24
scala> a.partitioner
res57: Option[org.apache.spark.Partitioner] = None
scala> a.partitions.size
res61: Int = 5
scala> a.map((_,3)).partitionBy(new org.apache.spark.HashPartitioner(2))
res62: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[58] at partitionBy at <console>:27
scala> a.partitions.size
res63: Int = 5
scala> res62.partitions.size
res64: Int = 2
11、reduceByKey (聚合操作)
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
scala> val rdd = sc.makeRDD(List(("female",5),("male",6),("male",7),("female",8)))
rdd: org.apache.spark.rdd.RDD[(String, Int)]
= ParallelCollectionRDD[23] at makeRDD at <console>:27
scala> val reducerdd = rdd.reduceByKey
reduceByKey reduceByKeyLocally
scala> val reducerdd = rdd.reduceByKey( (x,y) => x + y )
reducerdd: org.apache.spark.rdd.RDD[(String, Int)]
= ShuffledRDD[24] at reduceByKey at <console>:29
scala> reducerdd.collect
res7: Array[(String, Int)] = Array((female,13), (male,13))
12、groupByKey (分组聚合)
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
scala> val words = Array("one","two","two","three","three","three")
words: Array[String] = Array(one, two, two, three, three, three)
scala> val wprdd = sc.makeRDD(words).map( w => (w,1) )
wprdd: org.apache.spark.rdd.RDD[(String, Int)]
= MapPartitionsRDD[26] at map at <console>:29
scala> wprdd.collect
res9: Array[(String, Int)]
= Array((one,1), (two,1), (two,1), (three,1), (three,1), (three,1))
scala> val grouprdd = wprdd.groupByKey
grouprdd: org.apache.spark.rdd.RDD[(String, Iterable[Int])]
= ShuffledRDD[28] at groupByKey at <console>:31
scala> grouprdd.collect
res11: Array[(String, Iterable[Int])]
= Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
scala> val maprdd = grouprdd.map( t => (t._1,t._2.sum))
maprdd: org.apache.spark.rdd.RDD[(String, Int)]
= MapPartitionsRDD[29] at map at <console>:33
scala> maprdd.collect
res12: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
scala> val scores = Array( \
("Fred", 88), ("Fred", 95), ("Fred", 91), \
("Wilma", 93), ("Wilma", 95), ("Wilma", 98))
scores: Array[(String, Int)]
= Array((Fred,88), (Fred,95), (Fred,91), (Wilma,93), (Wilma,95), (Wilma,98))
scala> val input = sc.parallelize(scores)
input: org.apache.spark.rdd.RDD[(String, Int)]
= ParallelCollectionRDD[31] at parallelize at <console>:29
scala> val combine = input.combineByKey( \
(v)=>(v,1), \
(acc:(Int,Int),v)=>(acc._1+v,acc._2+1), \
combine: org.apache.spark.rdd.RDD[(String, (Int, Int))]
= ShuffledRDD[32] at combineByKey at <console>:31
scala> combine.collect
res14: Array[(String, (Int, Int))] = Array((Wilma,(286,3)), (Fred,(274,3)))
scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)}
result: org.apache.spark.rdd.RDD[(String, Double)]
= MapPartitionsRDD[33] at map at <console>:33
scala> result.collect
res15: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)]
= ParallelCollectionRDD[39] at parallelize at <console>:27
scala> val aggrdd = rdd.aggregateByKey(0)(math.max(_,_),_+_)
aggrdd: org.apache.spark.rdd.RDD[(Int, Int)]
= ShuffledRDD[40] at aggregateByKey at <console>:29
scala> aggrdd.collect
res19: Array[(Int, Int)] = Array((3,8), (1,7), (2,3))
scala> aggrdd.partitions.size
res20: Int = 3
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),1)
rdd: org.apache.spark.rdd.RDD[(Int, Int)]
= ParallelCollectionRDD[41] at parallelize at <console>:27
scala> val aggrdd = rdd.aggregateByKey(0)(math.max(_,_),_+_)
aggrdd: org.apache.spark.rdd.RDD[(Int, Int)]
= ShuffledRDD[42] at aggregateByKey at <console>:29
scala> aggrdd.collect
res21: Array[(Int, Int)] = Array((1,4), (3,8), (2,3))
foldByKey 是aggregateByKey的简化操作,seqop和combop相同
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)]
= ParallelCollectionRDD[43] at parallelize at <console>:27
scala> val frdd = rdd.foldByKey(0)(_+_)
frdd: org.apache.spark.rdd.RDD[(Int, Int)]
= ShuffledRDD[44] at foldByKey at <console>:29
scala> frdd.collect
res22: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
16、sortByKey (根据k排序)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)]
= ParallelCollectionRDD[45] at parallelize at <console>:27
scala> rdd.sortByKey(true).collect()
res23: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
scala> rdd.sortByKey(false).collect()
res24: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
17、sortBy (通过function 函数排序)
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
.sortByKey(ascending, numPartitions)
scala> val rdd = sc.parallelize(List(6,9,7,1,2,4,3,5))
rdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[64] at parallelize at <console>:27
scala> rdd.sortBy(x => x).collect
res27: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 9)
scala> rdd.sortBy(x => x%3).collect
res28: Array[Int] = Array(6, 9, 3, 7, 1, 4, 2, 5)
18、join (key的交集,仅 join 相同的)
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)]
= ParallelCollectionRDD[75] at parallelize at <console>:27
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)]
= ParallelCollectionRDD[76] at parallelize at <console>:27
scala> rdd.join(rdd1).collect
res29: Array[(Int, (String, Int))] = Array((2,(b,5)), (1,(a,4)), (3,(c,6)))
19、cogroup (将两个RDD groupbykey之后合并(分组聚合))
将两个RDD groupbykey之后合并,在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, new HashPartitioner(numPartitions))
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)]
= ParallelCollectionRDD[80] at parallelize at <console>:27
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)]
= ParallelCollectionRDD[81] at parallelize at <console>:27
scala> rdd.cogroup(rdd1).collect()
res30: Array[(Int, (Iterable[String], Iterable[Int]))]
= Array(
scala> val rdd2 = sc.parallelize(Array((4,4),(2,5),(3,6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)]
= ParallelCollectionRDD[84] at parallelize at <console>:27
scala> rdd.cogroup(rdd2).collect()
res31: Array[(Int, (Iterable[String], Iterable[Int]))]
= Array(
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3: org.apache.spark.rdd.RDD[(Int, String)]
= ParallelCollectionRDD[87] at parallelize at <console>:27
scala> rdd3.cogroup(rdd2).collect()
res32: Array[(Int, (Iterable[String], Iterable[Int]))]
= Array(
(1,(CompactBuffer(a, d),CompactBuffer())),
20、cartesian (笛卡尔积)
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
scala> val rdd1 = sc.parallelize(1 to 3)
rdd1: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[90] at parallelize at <console>:27
scala> val rdd2 = sc.parallelize(2 to 5)
rdd2: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[91] at parallelize at <console>:27
scala> rdd1.cartesian(rdd2).collect()
res33: Array[(Int, Int)] = Array(
(1,2), (1,3), (1,4), (1,5), (2,2), (2,3),
(3,2), (3,3), (2,4), (2,5), (3,4), (3,5))
21、pipe ( RDD每个分区都执行shell脚本)
def pipe(command: String): RDD[String] = withScope {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
[root@hadoop102 ~]# cd /opt/module/spark
[root@hadoop102 spark]# touch pipe.sh
[root@hadoop102 spark]# chmod 755 pipe.sh
[root@hadoop102 spark]# vim pipe.sh
echo "AA"
while read LINE; do
echo ">>>"${LINE}
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd: org.apache.spark.rdd.RDD[String]
= ParallelCollectionRDD[94] at parallelize at <console>:27
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res35: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
rdd: org.apache.spark.rdd.RDD[String]
= ParallelCollectionRDD[96] at parallelize at <console>:27
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res37: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)
22、coalesce (缩减分区数)
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.partitions.size
res1: Int = 4
scala> val csrdd = rdd.coalesce(3)
csrdd: org.apache.spark.rdd.RDD[Int]
= CoalescedRDD[1] at coalesce at <console>:26
scala> rdd.partitions.size
res2: Int = 4
scala> csrdd.partitions.size
res3: Int = 3
23、repartition (重新分区,所有数据重新洗牌)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd.partitions.size
res4: Int = 4
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int]
= MapPartitionsRDD[6] at repartition at <console>:26
scala> rerdd.partitions.size
res5: Int = 2
scala> val rerdd = rdd.repartition(4)
rerdd: org.apache.spark.rdd.RDD[Int]
= MapPartitionsRDD[10] at repartition at <console>:26
scala> rerdd.partitions.size
res6: Int = 4
scala> rerdd.collect
res7: Array[Int] = Array(2, 6, 10, 14, 3, 7, 11, 15, 4, 8, 12, 16, 1, 5, 9, 13)
24、repartitionAndSortWithinPartitions (重新分区+ 排序)
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
= self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
public class Test {
public static void main(String[] args) {
SparkConf wordcount = new SparkConf().setAppName("wordcount").setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(wordcount);
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random random = new Random();
JavaPairRDD<Integer,Integer> javaPairRDD
= javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer,random.nextInt(10));
JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD
= javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
public int numPartitions() { return 2; }
public int getPartition(Object key) {
return key.toString().hashCode() % numPartitions();
25、glom (RDD 每个分区变数组)
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int]
= ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.glom().collect()
res0: Array[Array[Int]] = Array(
Array(1, 2, 3, 4), Array(5, 6, 7, 8),
Array(9, 10, 11, 12), Array(13, 14, 15, 16))
26、mapValues (针对kv结构,只操作V)
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3: org.apache.spark.rdd.RDD[(Int, String)]
= ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd3.mapValues(_+"|||").collect()
res1: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
27、subtract (基于rdd1计算RDD差集的函数)
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> rdd.subtract(rdd1).collect()
res2: Array[Int] = Array(6, 8, 7)
scala> rdd1.subtract(rdd).collect()
res3: Array[Int] = Array(2, 1)