Spark广播变量应用

2021-05-21  本文已影响0人  扎西的德勒

一、广播变量

1、广播变量的优点

不需要每个task带上一份变量副本,而是变成每个节点的executor存一份副本。这样的话, 就可以让变量产生的副本数量大大减少。

2、广播变量的用法
//将mapRdd广播后返回broadcastValue
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(mapRdd)
//获取广播变量的值
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
3、广播变量的原理

初始的时候,在Driver端有一个副本数据。广播变量后,task运行的时候,在使用副本数据前,首先在所在本地Executor对应的BlockManager中,尝试获取副本数据;如果本地没有,即从Driver端拉取副本数据,并且保存在所在本地的BlockManager中;此后这个Executor上所有的task,都会直接使用本地BlockManager中的副本数据。另Executor的BlockManager除了从Driver端拉取数据,也可能从其他节点的BlockManager中拉去副本数据。
BlockManager:负责管理某个Executor对应的内存和磁盘的数据,尝试本地BlockManager中招map数据。

4、优化说明

假设有50个Executor,共1000个task;若每个map数据10M。默认情况下,1000个副本10M共10G数据。在集群中,通过网络传输,耗费10G的内存资源;如果使用了广播变量,50个Executor即50个副本10M共500M数据。而且Executor的BlockManager不一定都从Driver传输到本地,还可能从最近的节点的Executor的BlockManager中拉取数据,网络传输速度大大增加,传输数据大大减少。
10G/500M=20倍,极大的提高了性能。

二、代码实例

1、准备数据
//订单数据
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
//产品数据
p0001,xiaomi,1000,2
p0002,appale,1000,3
p0003,samsung,1000,4

2、代码开发

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkBroadCast {
  def main(args: Array[String]): Unit = {
    //构造Spark程序执行环境
    val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
    //如果集群运行,则不需要设置setMaster("local[*]")
    val sc = new SparkContext(conf)
    //设置日志级别
    sc.setLogLevel("WARN")
    //创建RDD,读取产品信息数据
    //产品记录样例:p0001,xiaomi,1000,2
    val productRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\prts.txt")
    val productMapRdd: Array[(String, String)] = productRdd.map(x => {
      (x.split(",")(0), x)
    }).collect()
//    productMapRdd.foreach(println)
    /**
     * (p0001,p0001,xiaomi,1000,2)
     * (p0002,p0002,appale,1000,3)
     * (p0003,p0003,samsung,1000,4)
     */
    //将产品数据作为广播变量
    val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(productMapRdd)

    //读取订单记录:1001,20150710,p0001,2
    val ordersRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\orders.txt")
    //将订单记录按照分区处理
    val productAndOrderRdd: RDD[String] = ordersRdd.mapPartitions(eachPartition => {
      //获取产品广播变量的数据并转换为map类型,目的是通过getOrElse获取产品数据
      val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
      //处理分区内的订单数据记录
      val finalStr = eachPartition.map(eachLine => {
        //将每条订单记录按照逗号拆分,返回集合类型
        val ordersGet: Array[String] = eachLine.split(",")
        //产品的map类型,通过key(订单的产品id)获取对应的产品记录,返回产品数据记录
        val getProductStr: String = getBroadCastMap.getOrElse(ordersGet(2), "")
        //订单记录拼接产品记录
        eachLine + "\t" + getProductStr
      })
      finalStr
    })
    productAndOrderRdd.foreach(println)

    /**
     * 1001,20150710,p0001,2    p0001,xiaomi,1000,2
     * 1002,20150710,p0002,3    p0002,appale,1000,3
     * 1002,20150710,p0003,3    p0003,samsung,1000,4
     */


    //关闭Spark环境
    sc.stop()
  }

}

三、注意事项

上一篇下一篇

猜你喜欢

热点阅读