Spark

SparkCore(二)

2021-03-30  本文已影响0人  八爪鱼下水

每种部署模式如何提交任务?

Client模式yarn

Cluster模式yarn模式

两种模式的不同点是什么?

1.运行地点不同.

  1. yarn-client会导致本地负责Spark任务调取.

3.所以yarn-cluster模式下,效果更好一些,因为不用反向注册回来给本地机器.

RDD

Resilient Distributed Dataset (弹性化,分布式,数据集)

五大属性

1.分区列表

2.计算函数

3.依赖关系

4.Key-Value

5.位置优先性 : 移动计算不移动存储

RDD的依赖

1-为什么有依赖
2-依赖有什么作用
3- 如何判断宽窄依赖?

RDD的DAG

1-Spark的计算引擎关键组成.
2-DAG通过Action算子划分.
3-DAG对应就是Job.
4-DAG内部通过Shuffle算子划分Stages.

RDD的缓存

两种:Cache和Persist

RDD的checkpoint

把RDD检查点放到hdfs中.斩断依赖关系,后续使用可以直接读取了.如果删除会报错.

广播变量

 def main(args: Array[String]): Unit = {
    //申请资源

    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripPrefix("$"))
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    //创建RDD
    val sc: SparkContext = spark.sparkContext
    //水果名称
    val kvFruit: RDD[(Int, String)] = sc
      .parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")))
    //把水果转换成map集合
    val collmap: collection.Map[Int, String] = kvFruit.collectAsMap

    //设置水果编号
    val fruitMap: RDD[Int] = sc.parallelize(Array(2,1))

    //需求:根据水果的编号查找水果的名称
    fruitMap.map(x=>collmap(x)).collect().foreach(println)

    //改进:如果水果很多,
    // 那么每个水果都需要拉取fruitMap变量进行对比得到水果名称
    val valueBroad: Broadcast[collection.Map[Int, String]] =
    sc.broadcast(collmap) //此处为 广播变量.
    //打印水果
    fruitMap.map(x=>valueBroad.value(x)).collect().foreach(println)

    spark.stop()
  }

累加器

sc.longAccumulator("acc_count")

 def main(args: Array[String]): Unit = {
    //申请资源

    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripPrefix("$"))
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    var seq = Seq(1,2,3)

    //scala的加法
    var count = 0
    seq.map(x=> count += x)
   // println(count)

    //rdd的加法-0--为什么会出现现象?因为变量在driver端定义,
    // 将数据发送到executor执行累加,
    // 但是执行完累加后结果并没返回driver
    var counter2 = 0
    val rdd1: RDD[Int] = sc.parallelize(seq)
    rdd1.foreach(x => counter2 += x)
    //println(counter2)

    //提出了在driver端和executor端共享当前变量
    //累加器也是在action操作的时候触发
    val acc: Accumulator[Int] = sc.accumulator(0) //
    rdd1.foreach(x=>acc+=x)
   // println(acc)

    //使用不过期的方法
    val acc_count: LongAccumulator = sc.longAccumulator("acc_count")
    rdd1.foreach(x=>acc_count.add(x))
    println(acc_count)//LongAccumulator(id: 51, name: Some(acc_count), value: 6)
    println(acc_count.value)

  }
上一篇下一篇

猜你喜欢

热点阅读