spark broadcast variables and Ac

2021-05-31  本文已影响0人  caster

1. Accumulator(累加器):分布式共享只写变量

求和示例:对rdd(两个分区)数据求和,driver端的sum传到不同executor分区计算,结果并不会返回driver端,如下代码所示:

val sc = SparkSession.builder.master("local").appName("test").getOrCreate()

val rdd = sc.sparkContext.parallelize(List(
  1, 2, 3, 4
),2)
var sum = 0
rdd.foreach(e => {
  sum += e
  println(sum)
})
println("***")
println(sum)

sc.stop()

输出结果如下,每个分区获取sum独立计算结果。driver端sum值不变:

1
3
3
7
***
0

Acc变量会将每个分区计算结果返回driver端再合并结果。

val sc = SparkSession.builder.master("local").appName("test").getOrCreate()

val rdd = sc.sparkContext.parallelize(List(
  1, 2, 3, 4
),2)
val sum = sc.sparkContext.longAccumulator("sum")
rdd.foreach(e => {
  sum.add(e)
  println(sum.value)
})
println("***")
println(sum.value)

sc.stop()

通过累加器执行,结果复合预期:

1
3
3
7
***
10

累加器原理:
少加:转换算子中调用累加器,如果转换后的rdd没有调用行动算子,累加器不会执行。
多加:如果算子中调用累加器后多次执行行动算子,则会多加一次。
executor之间acc不能互相访问,只有dirver端可以调用各分区的acc结果。
应用实例:wordcount
自己构造累加器实现wc功能,不需要reduceByKey的shullfe操作

object Test {
  def main(args: Array[String]): Unit = {

    val sc = SparkSession.builder.master("local").appName("test").getOrCreate()

    val rdd = sc.sparkContext.parallelize(List(
      "hello word", "hello spark"
    ), 2)

    //1. reduceBykey会产生shullfe操作
    val res = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    res.collect().foreach(println)

    //2. 通过累加器实现
    val wc = new WcAccumulator()
    sc.sparkContext.register(wc, "wc")

    rdd.flatMap(_.split(" ")).foreach(
      word => {
        wc.add(word)
      }
    )
    println(wc.value)

    sc.stop()

  }

  /*
    AccumulatorV2[IN, OUT]
    IN:输入类型
    OUT:返回类型
   */
  class WcAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {

    private var map = mutable.Map[String, Long]()
    //判断是否为初始状态
    override def isZero: Boolean = map.isEmpty

    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
      new WcAccumulator()
    }

    override def reset(): Unit = map.clear()

    //累加器计算规则
    override def add(v: String): Unit = {
      val count = map.getOrElse(v, 0L) + 1
      map.update(v, count)
    }

    //driver合并多个累加器:将other的OUT合并到当前的OUT
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
      other.value.foreach {
        case (word, count) => {
          val newCount=this.map.getOrElse(word,0L)+count
          this.map.update(word,newCount)
        }
      }
    }
    //累加器结果
    override def value: mutable.Map[String, Long] = map
  }

}

输出结果如下:两种方式结果一致

(word,1)
(hello,2)
(spark,1)
Map(spark -> 1, word -> 1, hello -> 2)

2. broadcast variables(广播变量):分布式共享只读变量

闭包数据以task为单位发送,一个executor中如果有多个tasks,则会包含多个重复的闭包数据。广播变量实现了一个executor(JVM)只保存一份闭包数据在内存中,多个tasks共享此数据。

//封装
val bc = sc.broadcast(values)
//获取
bc.value
上一篇下一篇

猜你喜欢

热点阅读