大数据分析课程

Spark--数据倾斜解决方案

2019-03-25  本文已影响0人  李小李的路

1.数据倾斜表现形势

导致数据倾斜的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。

2缓解数据倾斜方式

2.1 尽量避免数据源的数据倾斜

01-kafka数据源

02-hive数据源

2.2 调整并行度分散同一个Task的不同Key

2.3 自定义partitioner

3彻底解决数据倾斜的方式

3.1 聚合类倾斜

3.1.1 两阶段聚合(局部聚合+全局聚合)

// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
  new PairFunction<Tuple2<Long,Long>, String, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
    throws Exception {
      Random random = new Random();
      int prefix = random.nextInt(10);
      return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
    }
  });
// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
  new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
      return v1 + v2;
    }
  });
// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
  new PairFunction<Tuple2<String,Long>, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
    throws Exception {
      long originalKey = Long.valueOf(tuple._1.split("_")[1]);
      return new Tuple2<Long, Long>(originalKey, tuple._2);
    }
  });
// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
  new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
      return v1 + v2;
    }
  });

3.1.2 对distinct算子优化

原始代码:
val resultDF: DataFrame = tempDF
        .selectExpr(
          "imei",
          "mark(con(sms_consign)) as flag")
        .filter("flag!=0").distinct()
修改后:
val resultDF: DataFrame = tempDF
        .selectExpr(
          "imei",
          "mark(con(sms_consign)) as flag")
        .filter("flag!=0")
    resultDF.createOrReplaceTempView("temp")
    val sql = "select imei,flag from temp group by imei,flag"
    spark.sql(sql)
        .write.parquet("/user/liyahui_su/keyword_imei_half_year")

3.2 过滤少数导致倾斜的key

3.3Join类导致的倾斜

3.3.1 将Reduce side Join转变为Map side Join

3.3.2为数据倾斜的key增加随机前/后缀

3.3.3使用随机前缀和扩容RDD进行join

3.3.4 大表随机添加N种随机前缀,小表扩大N倍

3.3.5 采样倾斜key并分拆join操作

Attention Please--文章来自互联网资料整理,如有雷同,纯属李小李抄袭,如有侵权请联系删除 From 李小李

上一篇下一篇

猜你喜欢

热点阅读