Java Spark 简单示例(八) 自定义分区划分器

2018-09-28  本文已影响0人  憨人Zoe

大数据学习交流微信群

今天花了大量时间来搜索分区划分器相关的资料。感觉大多雷同,无非是Spark 内置的两个分区划分器HashPartitionerRangePartitioner。但是关于RangePartitioner具体用法讲解的都很少。

HashPartitioner 是绝大多数场景的默认分区划分器。RangePartitioner主要用于RDD的数据排序相关API中,比如sortByKey底层使用的就是RangePartitioner分区划分器。

我将通过下面demo8演示各个分区划分器的用法以及自定义分区器(第9步)。

1.test2.txt中创建RDD
2. 打印初始分区数,分区划分器
3. 调用flatMapToPair生成包含<K,V>结构的数据的RDD
4. 打印flatMapToPair后分区数,分区划分器

//test2.txt
aaa,bbb,ccc,ddd,eee,fff,ggg
aaa,ccc,eee
public class demo8 {
    private static String appName = "spark.demo";
    private static String master = "local[*]";

    public static void main(String[] args) {
        JavaSparkContext sc = null;
        try {
            //初始化 JavaSparkContext
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            sc = new JavaSparkContext(conf);

            JavaRDD<String> rdd = sc.textFile("test2.txt");

            System.out.println("初始分区数:" + rdd.getNumPartitions());
            System.out.println("初始分区划分器:" + rdd.partitioner().toString());

            JavaPairRDD<String, Integer> pairRDD = rdd.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
                public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                    List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
                    String[] arr = s.split(",");
                    for (String ele : arr) {
                        list.add(new Tuple2<String, Integer>(ele, 1));
                    }
                    return list.iterator();
                }
            }).cache();

            System.out.println("flatMapToPair后分区数:" + pairRDD.getNumPartitions());
            System.out.println("flatMapToPair后分区划分器:" + pairRDD.partitioner().toString());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (sc != null) {
                sc.close();
            }
        }
    }
}

输出结果:

初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty

结果显示,执行flatMapToPair后,分区和划分器都没有发生变化,也说明flatMapToPair操作不会触发RDD默认的划分器。

5. 执行groupByKey操作,打印分区数和划分器

JavaPairRDD<String, Iterable<Integer>> groupRDD = pairRDD.groupByKey(4);
System.out.println("groupByKey后分区数:" + groupRDD.getNumPartitions());
System.out.println("groupByKey后分区划分器:" + groupRDD.partitioner().toString());

输出结果:

初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
groupByKey后分区数:4
groupByKey后分区划分器:Optional[org.apache.spark.HashPartitioner@4]

结果显示,执行groupByKey 函数触发了RDDHashPartitioner分区划分器。

6. 执行sortByKey操作,打印分区数和划分器

JavaPairRDD<String, Iterable<Integer>> sortPartitionRDD = groupRDD.sortByKey();
System.out.println("sortByKey后分区数:" + sortPartitionRDD.getNumPartitions());
System.out.println("sortByKey后分区划分器:" + sortPartitionRDD.partitioner().toString());
初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
groupByKey后分区数:4
groupByKey后分区划分器:Optional[org.apache.spark.HashPartitioner@4]
sortByKey后分区数:4
sortByKey后分区划分器:Optional[org.apache.spark.RangePartitioner@b2ba98ec]

结果显示,执行sortByKey 函数触发了RDDRangePartitioner分区划分器,由于sortByKey前正在使用的是HashPartitioner,所以RDD被重新划分区。
补充:sortByKey 方法有多个重载,包括可以指定排序规则等。

7. 第5步用一下代码替换,输出结果相同

JavaPairRDD<String, Integer> hashPartitionRDD = pairRDD.partitionBy(new HashPartitioner(4)).cache();
System.out.println("partitionBy后分区数:" + hashPartitionRDD.getNumPartitions());
System.out.println("partitionBy后分区划分器:" + hashPartitionRDD.partitioner().toString());

8. 手动指定调用RangePartitioner分区器,这个分区器用Java实现把我难住了好久,主要是构造函数都看不懂,而且网上demo很难找。功夫不负有心人,在github上搜到了 Partitioning.java

//Range 分区器
RDD<Tuple2<String, Integer>> prdd = JavaPairRDD.toRDD(pairRDD);
RangePartitioner rangePartitioner = new RangePartitioner(4,
    prdd,//待排序元 rdd
    true,//升序
    scala.math.Ordering.String$.MODULE$,//排序类型
    scala.reflect.ClassTag$.MODULE$.apply(String.class));//反射?

//指定分区划分器
JavaPairRDD<String, Integer> rangePartitionRDD = pairRDD.partitionBy(rangePartitioner).cache();
System.out.println("partitionBy后分区数:" + rangePartitionRDD.getNumPartitions());
System.out.println("partitionBy后分区划分器:" + rangePartitionRDD.partitioner().toString());

//打印分区索引:key:value
Partitioner partitioner = rangePartitionRDD.partitioner().get();
System.out.println("分区数:" + partitioner.numPartitions());
List<Tuple2<String, Integer>> resultRange = rangePartitionRDD.collect();
for (Tuple2<String, Integer> tuple2 : resultRange) {
    System.out.println(partitioner.getPartition(tuple2._1()) + ":" + tuple2._1() + ":" + tuple2._2());
}

输出结果:

初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
partitionBy后分区数:4
partitionBy后分区划分器:Optional[org.apache.spark.RangePartitioner@b2ab910c]
分区数:4
0:aaa:1
0:bbb:1
0:aaa:1
1:ccc:1
1:ccc:1
2:ddd:1
2:eee:1
2:eee:1
3:fff:1
3:ggg:1

9. 自定义分区器,直接上代码

package com.yzy.spark;

import org.apache.spark.Partitioner;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MyPartitioner extends Partitioner {
    private int partitions; //分区数

    // RDD 内部数据和分区索引
    private Map<Object, Integer> hashCodePartitionIndexMap = new ConcurrentHashMap<Object, Integer>();

    //构造函数:key的集合,分区数(可选)
    public MyPartitioner(List<String> keys, int partitionsNum) throws Exception {
        if (keys == null || keys.isEmpty()) {
            throw new Exception("keys 不能为空");
        }
        this.partitions = partitionsNum > 0 ? partitionsNum : keys.size();
        for (int i = 0; i < keys.size(); i++) {
             hashCodePartitionIndexMap.put(keys.get(i), i % this.partitions);
        }
    }

    @Override
    public int numPartitions() {
        return this.partitions;
    }

    @Override
    public int getPartition(Object key) {
        return hashCodePartitionIndexMap.get(key.toString());
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof MyPartitioner) {
            return ((MyPartitioner) obj).partitions == this.partitions;
        }
        return false;
    }

    @Override
    public int hashCode() {
        return this.partitions;
    }
}

demo8 改造

//获得key集合(去重)
List<String> keys = pairRDD.map(new Function<Tuple2<String, Integer>, String>() {
    public String call(Tuple2<String, Integer> v1) throws Exception {
        return v1._1();
    }
}).distinct().collect();

//自定义分区
JavaPairRDD<String, Integer> myPartitionRDD = pairRDD.partitionBy(new MyPartitioner(keys, 4)).cache();
System.out.println("partitionBy后分区数:" + myPartitionRDD.getNumPartitions());
System.out.println("partitionBy后分区划分器:" + myPartitionRDD.partitioner().toString());

结果输出:

初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
partitionBy后分区数:4
partitionBy后分区划分器:Optional[com.yzy.spark.MyPartitioner@4]

结果显示,自定义分区已生效,接下来打印一下partitioner的分区信息

Partitioner partitioner = myPartitionRDD.partitioner().get();
System.out.println("分区数:" + partitioner.numPartitions());
for (String k : keys) {
    System.out.println(k + " 的分区ID:" + partitioner.getPartition(k));
}

结果输出:

分区数:4
bbb 的分区ID:0
ddd 的分区ID:1
fff 的分区ID:2
ggg 的分区ID:3
eee 的分区ID:0
ccc 的分区ID:1
aaa 的分区ID:2
上一篇下一篇

猜你喜欢

热点阅读