数客联盟

RDD mapPartitionsWithIndex 与mapP

2016-10-12  本文已影响0人  Woople

定义

Transformation Meaning
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

注意,func的返回值是Iterator<U>

//创建一个有3个partition的RDD
scala> val testRDD = sc.makeRDD(1 to 10, 3)
testRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:27

scala> testRDD.partitions.size
res1: Int = 3

scala> var newRDD = testRDD.mapPartitionsWithIndex {
     |   (index, partitionIterator) => {
     |     val partitionsMap = scala.collection.mutable.Map[Int, List[Int]]()
     |     var partitionList = List[Int]()
     | 
     |     while (partitionIterator.hasNext) {
     |       partitionList = partitionIterator.next() :: partitionList
     |     }
     | 
     |     partitionsMap(index) = partitionList
     |     partitionsMap.iterator//返回值
     |   }
     | }
newRDD: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:29

//每个RDD都存储了哪些元素
scala> newRDD.collect
res2: Array[(Int, List[Int])] = Array((0,List(3, 2, 1)), (1,List(6, 5, 4)), (2,List(10, 9, 8, 7)))
scala> val newRDD = testRDD.mapPartitions { item => {
     |   var result = List[String]()
     | 
     |   while (item.hasNext) {
     |     result = (item.next() + 1).toString :: result
     |   }
     | 
     |   result = result ::: List("|")
     |   result.iterator//返回值
     | }
     | }
newRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at mapPartitions at <console>:29

//每个partition的内容用|分隔,和上一个用例的结果一致
scala> newRDD.collect
res15: Array[String] = Array(4, 3, 2, |, 7, 6, 5, |, 11, 10, 9, 8, |)
上一篇 下一篇

猜你喜欢

热点阅读