Spark第一天作业

2018-11-06  本文已影响0人  吾为天帝乎

1.自定义一个集合val list = List("张三:20","李四:21","王五:20"),分三个区 使用mapPartitions算子构建一个新的集合,要求内部元素的格式为(String,Int),分别代表姓名和年龄

2.自定义一个集合val list = List(1,2,3,4,5,6,7,8,9),分三个区 使用mapPartitionsWithIndex算子构建一个新的集合,要求记录每个分区的元素,同时记录每个分区元素的平方和

3.自定义一个集合val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))(1)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素乘积

(2)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素加和

object Work01 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
    val sc = new SparkContext(sparkConf)
    val list = List("张三:20","李四:21","王五:20")
    val listRDD: RDD[String] = sc.parallelize(list,3)
    val f : (Iterator[String]) => (Iterator[(String,Int)]) = it => {
      var result:ListBuffer[(String,Int)] = ListBuffer()
      while (it.hasNext){
        val value = it.next()
        val name = value.split(":")(0)
        val age = value.split(":")(1).toInt
        result.append((name,age))
      }
      result.iterator
    }
    println(listRDD.mapPartitions(f).partitions.size)
    // 重新分区 - 传入分区器(分区数量)
    println(listRDD.mapPartitions(f).partitionBy(new HashPartitioner(1)).partitions.size)
  }
}

object Work02 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
    val sc = new SparkContext(sparkConf)
    val list = List(1,2,3,4,5,6,7,8,9)
    val listRDD: RDD[Int] = sc.parallelize(list,3)
    val f: (Int,Iterator[Int]) => (Iterator[(Int,Int)]) = (index,it) => {
      var sum = 0
      var result:ListBuffer[(Int,Int)] = ListBuffer()
      while (it.hasNext){
        val value = it.next()
        sum += (value * value)
        result.append((index,value))
      }
      result.append((index,sum))
      result.iterator
    }
    listRDD.mapPartitionsWithIndex(f).groupByKey().foreach(println)

  }
 def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
    val sc = new SparkContext(sparkConf)
    val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))
    val pairRDD: RDD[(Int,Int)] = sc.parallelize(list)
    pairRDD.groupByKey().foreach(println)
    val seq1: (Int,Int) => (Int) = (zero,value) => {
      zero * value
    }
    val com1: (Int,Int) => (Int) = (par,otherPar) => {
      par * otherPar
    }
    pairRDD.aggregateByKey(1)(seq1,com1).foreach(println)
    val seq2: (Int,Int) => (Int) = (zero,value) => {
      zero + value
    }
    val com2: (Int,Int) => (Int) = (par,otherPar) => {
      par + otherPar
    }
    pairRDD.aggregateByKey(0)(seq2,com2).foreach(println)
  }
上一篇下一篇

猜你喜欢

热点阅读