2020-11-26-Spark-5(Spark-Core)

2020-12-04  本文已影响0人  冰菓_

RDD算子:sortBy,交集,并集,差集,拉链,partitionBy,aggregateByKey,reduceByKey groupByKey,聚合算子,join算子,连接分组算子
比较RDD算子的性能

1.sortBy(可能存在shuffle,不能改变分区数量,可指定升序降序)

(true是升序)
(默认按字典顺序排序)

    val sc = new SparkContext(new SparkConf().setAppName("...").setMaster("local[*]"))
    val rdd = sc.makeRDD(List[Int](1, 6,2,4,5,3), 2)
    rdd.sortBy(data=>data).saveAsTextFile("src/aa")
    sc.stop()
    val sc = new SparkContext(new SparkConf().setAppName("...").setMaster("local[*]"))
    val rdd = sc.makeRDD(List(("11", 1), ("22", 2),("11",2),("2",11)),1)
    rdd.sortBy(data=>data._1.toInt,true).saveAsTextFile("src/aa")
    sc.stop()

2.双value类型(注意数据类型,差集的顺序,分区数据一致性)

交集

    val sc = new SparkContext(new SparkConf().setAppName("...").setMaster("local[*]"))
    val S1 = sc.makeRDD(List(1, 2, 3, 4))
    val S2 = sc.makeRDD(List(3, 4, 5, 6))
    val result: RDD[Int] = S1.intersection(S2)
    result.collect.foreach(println)
    sc.stop()

并集(共有的数据不会去重,去重可用set序列)

    val sc = new SparkContext(new SparkConf().setAppName("...").setMaster("local[*]"))
    //交集
    val S1 = sc.makeRDD(List(1, 2, 3, 4))
    val S2 = sc.makeRDD(List(3, 4, 5, 6))
    val result = S1.union(S2)
    result.collect.foreach(println)
    sc.stop()

差集(不同的角度,结果不同)

   val sc = new SparkContext(new SparkConf().setAppName("...").setMaster("local[*]"))
    //交集
    val S1 = sc.makeRDD(List(1, 2, 3, 4))
    val S2 = sc.makeRDD(List(3, 4, 5, 6))
    val result = S1.subtract(S2)
    result.collect.foreach(println)
    sc.stop()

拉链(分区数据的一致性)
Can't zip RDDs with unequal numbers of partitions

    val sc = new SparkContext(new SparkConf().setAppName("...").setMaster("local[*]"))
    val S1 = sc.makeRDD(List(1, 2, 3, 4))
    val S2 = sc.makeRDD(List(3, 4, 5, 6))
    val result = S1.zip(S2)
    result.collect.foreach(println)
    sc.stop()

3.partitionBy算子(重分区)

(要把数据转换成k,v类型,partitionby针对k,v类型)

    var sc = new SparkContext(new SparkConf().setAppName("test2").setMaster("local[*]"))
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    val maprdd: RDD[(Int, Int)] = rdd.map(data => (data, 1))
    val result: RDD[(Int, Int)] = maprdd.partitionBy(new HashPartitioner(2))
    result.saveAsTextFile("src/aa")
    sc.stop()

如果重分区的分区器与当前RDD的分区器一致怎么办?
例如:

val result: RDD[(Int, Int)] = maprdd.partitionBy(new HashPartitioner(2))
        .partitionBy(new HashPartitioner(2))

解答:HashPartitioner重写了equals方法,self.partitioner == Some(partitioner)会进行比较,如果相等还是调用当前的分区器

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }
 if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }

其他的分区器
PythonPartitioner RangePartitioner HashPartitioner sortBykey

4.reduceByKey groupByKey groupBy算子的示例

reduceByKey groupByKey 是针对(k,v)类型的 ,前者是聚合 后者是非聚合的结果
groupByKey groupBy 的key就是数据的key 后者是自己指定的

    val sc = new SparkContext(new SparkConf().setAppName("test4").setMaster("local[*]"))
    val rdd = sc.makeRDD(List(("A", 1), ("A", 2), ("A", 3), ("A", 4), ("B", 1)))
    val result1: RDD[(String, Int)] = rdd.reduceByKey((s1, s2) => s1 + s2)
    val result2: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    val result3: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)

5.aggregateByKey 和 reduceByKey 算子的示例 以及简化版foldByKey

(分区内 分区间)
(前者分区内部分聚合的规则和分区间聚合的规则可不一致)
(后者分区内部分聚合的规则和分区间聚合的规则是一致的)
foldByKey是简化版的aggregateByKey,分区内和分区间的规则是一致的

    val sc = new SparkContext(new SparkConf().setAppName("test4").setMaster("local[4]"))
    val rdd = sc.makeRDD(List(("a", 2), ("b", 4), ("b", 1), ("a", 2), ("a", 8), ("c", 1)), 2)
    rdd.reduceByKey(_ + _).saveAsTextFile("src/aa")
    //(a,12)
    //(c,1)
    //(b,5)
    rdd.aggregateByKey(0)(
      (x, y) => Math.max(x, y), (x, x1) => x + x1
    ).saveAsTextFile("src/bb")
    //(a,10)
    //(c,1)
    //(b,4)
    sc.stop()
  }
 //比较的第一个值为0
 rdd.foldByKey(0)(_+_).saveAsTextFile("/src/cc")

aggregateByKey算子求平均数,对第一个参数的理解

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test5 {
  def main(args: Array[String]): Unit = {
    // def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
    //其中的U是初始值 U是传递过来的值
    //求字母key的平均值
    var sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    val list = List(("a", 2), ("b", 3), ("a", 4), ("b", 1), ("c", 8), ("a", 3))
    val rdd = sc.makeRDD(list)
    //第一个0叠加总数 第二个0叠加次数
    //类型(Int, Int)与传入的(0,0)是一致的
    val value: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
      //在分区内进行叠加
      (u, v) => (u._1 + v, u._2 + 1),
      //x是初始值和计算后的结果,x1是要叠加的值
      (x, x1) => (x._1 + x1._1, x._2 + x1._2)
    )
    //value值的第一个是总和,第二个值个数
    //方法一mapvalue
    val result: RDD[(String, Int)] = value.mapValues(data => data match {
      case (x, y) => x / y
    })
    //方法二
    val result1: RDD[(String, Int)] = value.map(data => (data._1, (data._2._1 / data._2._2)))
    result1.collect.foreach(println)
    sc.stop()
  }
}

6.combineByKey算子的示例(传入的第一个参数的作用是指定初始值的类型)

 def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }
 def main(args: Array[String]): Unit = {
    // def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
    //其中的U是初始值 U是传递过来的值
    //求字母key的平均值
    val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    val list = List(("a", 2), ("b", 3), ("a", 4), ("b", 1), ("c", 8), ("a", 3))
    val rdd = sc.makeRDD(list)
    //第一个0叠加总数 第二个0叠加次数
    //类型(Int, Int)与传入的(0,0)是一致的
    val value: RDD[(String, (Int, Int))] = rdd.combineByKey(
      //在分区内进行叠加
      //V ->  U 的转换
      v => (v,1),
      // U 的类型是(v,1)
      (u:(Int,Int), v) => (u._1 + v, u._2 + 1),
      //x是初始值和计算后的结果,x1是要叠加的值
      (x:(Int,Int), x1) => (x._1 + x1._1, x._2 + x1._2)
    )
    //value值的第一个是总和,第二个值个数
    //方法一mapvalue
    val result: RDD[(String, Int)] = value.mapValues(data => data match {
      case (x, y) => x / y
    })
    //方法二
    val result1: RDD[(String, Int)] = value.map(data => (data._1, (data._2._1 / data._2._2)))
    result1.collect.foreach(println)
    sc.stop()
  }

7.区别四种聚合算子

object Test6 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    val list = List(("a", 2), ("b", 3), ("a", 4), ("b", 1), ("c", 8), ("a", 3))
    val rdd = sc.makeRDD(list)

    //四种聚合算子
    rdd.reduceByKey(_+_).collect.foreach(println)
    rdd.aggregateByKey(0)((x,y)=>x+y,(x,x1)=>x+x1).collect.foreach(println)
    //foldByKey算子是对aggregateByKey分区内操作的简化版
    rdd.foldByKey(0)(_+_).collect.foreach(println)
    //combineByKey算子是aggregateByKey第一个参数的简化
    rdd.combineByKey(x=>x,(x:Int,y)=>x+y,(x:Int,x1:Int)=>x+x1).collect.foreach(println)
    sc.stop()

  }
}
reduceByKey 
                 combineByKeyWithClassTag[V](
                 (v: V) => v,
                 func,
                 func, 
                 partitioner)            
aggregateByKey               
                combineByKeyWithClassTag[U](
                (v: V) => cleanedSeqOp(createZero(), v),
                cleanedSeqOp, 
                combOp,
                partitioner)
foldByKey 
                combineByKeyWithClassTag[V](
                (v: V) => cleanedFunc(createZero(), v),
                cleanedFunc, 
                cleanedFunc, 
                partitioner)    
combineByKey    combineByKeyWithClassTag(
                createCombiner, 
                mergeValue, 
                mergeCombiners)
                (null) 

8. join() leftOuterJoin() rightOuterJoin() join连接算子

连接时可能产生笛卡尔积

9.cogroup连接分组

在一个 (K, V) 对的 Dataset 上调用时,返回多个类型为 (K, (Iterable<V>, Iterable<W>)) 的元组所组成的 Dataset。

上一篇下一篇

猜你喜欢

热点阅读