Java Spark 简单示例(八) 自定义分区划分器
今天花了大量时间来搜索分区划分器相关的资料。感觉大多雷同,无非是Spark
内置的两个分区划分器HashPartitioner
和 RangePartitioner
。但是关于RangePartitioner
具体用法讲解的都很少。
HashPartitioner
是绝大多数场景的默认分区划分器。RangePartitioner
主要用于RDD
的数据排序相关API
中,比如sortByKey
底层使用的就是RangePartitioner
分区划分器。
我将通过下面demo8
演示各个分区划分器的用法以及自定义分区器(第9
步)。
1. 从test2.txt
中创建RDD
2. 打印初始分区数,分区划分器
3. 调用flatMapToPair
生成包含<K,V>
结构的数据的RDD
4. 打印flatMapToPair
后分区数,分区划分器
//test2.txt
aaa,bbb,ccc,ddd,eee,fff,ggg
aaa,ccc,eee
public class demo8 {
private static String appName = "spark.demo";
private static String master = "local[*]";
public static void main(String[] args) {
JavaSparkContext sc = null;
try {
//初始化 JavaSparkContext
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
JavaRDD<String> rdd = sc.textFile("test2.txt");
System.out.println("初始分区数:" + rdd.getNumPartitions());
System.out.println("初始分区划分器:" + rdd.partitioner().toString());
JavaPairRDD<String, Integer> pairRDD = rdd.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
String[] arr = s.split(",");
for (String ele : arr) {
list.add(new Tuple2<String, Integer>(ele, 1));
}
return list.iterator();
}
}).cache();
System.out.println("flatMapToPair后分区数:" + pairRDD.getNumPartitions());
System.out.println("flatMapToPair后分区划分器:" + pairRDD.partitioner().toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (sc != null) {
sc.close();
}
}
}
}
输出结果:
初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
结果显示,执行flatMapToPair
后,分区和划分器都没有发生变化,也说明flatMapToPair
操作不会触发RDD
默认的划分器。
5. 执行groupByKey
操作,打印分区数和划分器
JavaPairRDD<String, Iterable<Integer>> groupRDD = pairRDD.groupByKey(4);
System.out.println("groupByKey后分区数:" + groupRDD.getNumPartitions());
System.out.println("groupByKey后分区划分器:" + groupRDD.partitioner().toString());
输出结果:
初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
groupByKey后分区数:4
groupByKey后分区划分器:Optional[org.apache.spark.HashPartitioner@4]
结果显示,执行groupByKey
函数触发了RDD
的HashPartitioner
分区划分器。
6. 执行sortByKey
操作,打印分区数和划分器
JavaPairRDD<String, Iterable<Integer>> sortPartitionRDD = groupRDD.sortByKey();
System.out.println("sortByKey后分区数:" + sortPartitionRDD.getNumPartitions());
System.out.println("sortByKey后分区划分器:" + sortPartitionRDD.partitioner().toString());
初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
groupByKey后分区数:4
groupByKey后分区划分器:Optional[org.apache.spark.HashPartitioner@4]
sortByKey后分区数:4
sortByKey后分区划分器:Optional[org.apache.spark.RangePartitioner@b2ba98ec]
结果显示,执行sortByKey
函数触发了RDD
的RangePartitioner
分区划分器,由于sortByKey
前正在使用的是HashPartitioner
,所以RDD
被重新划分区。
补充:sortByKey
方法有多个重载,包括可以指定排序规则等。
7. 第5步用一下代码替换,输出结果相同
JavaPairRDD<String, Integer> hashPartitionRDD = pairRDD.partitionBy(new HashPartitioner(4)).cache();
System.out.println("partitionBy后分区数:" + hashPartitionRDD.getNumPartitions());
System.out.println("partitionBy后分区划分器:" + hashPartitionRDD.partitioner().toString());
8. 手动指定调用RangePartitioner
分区器,这个分区器用Java实现把我难住了好久,主要是构造函数都看不懂,而且网上demo
很难找。功夫不负有心人,在github
上搜到了 Partitioning.java
//Range 分区器
RDD<Tuple2<String, Integer>> prdd = JavaPairRDD.toRDD(pairRDD);
RangePartitioner rangePartitioner = new RangePartitioner(4,
prdd,//待排序元 rdd
true,//升序
scala.math.Ordering.String$.MODULE$,//排序类型
scala.reflect.ClassTag$.MODULE$.apply(String.class));//反射?
//指定分区划分器
JavaPairRDD<String, Integer> rangePartitionRDD = pairRDD.partitionBy(rangePartitioner).cache();
System.out.println("partitionBy后分区数:" + rangePartitionRDD.getNumPartitions());
System.out.println("partitionBy后分区划分器:" + rangePartitionRDD.partitioner().toString());
//打印分区索引:key:value
Partitioner partitioner = rangePartitionRDD.partitioner().get();
System.out.println("分区数:" + partitioner.numPartitions());
List<Tuple2<String, Integer>> resultRange = rangePartitionRDD.collect();
for (Tuple2<String, Integer> tuple2 : resultRange) {
System.out.println(partitioner.getPartition(tuple2._1()) + ":" + tuple2._1() + ":" + tuple2._2());
}
输出结果:
初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
partitionBy后分区数:4
partitionBy后分区划分器:Optional[org.apache.spark.RangePartitioner@b2ab910c]
分区数:4
0:aaa:1
0:bbb:1
0:aaa:1
1:ccc:1
1:ccc:1
2:ddd:1
2:eee:1
2:eee:1
3:fff:1
3:ggg:1
9. 自定义分区器,直接上代码
package com.yzy.spark;
import org.apache.spark.Partitioner;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MyPartitioner extends Partitioner {
private int partitions; //分区数
// RDD 内部数据和分区索引
private Map<Object, Integer> hashCodePartitionIndexMap = new ConcurrentHashMap<Object, Integer>();
//构造函数:key的集合,分区数(可选)
public MyPartitioner(List<String> keys, int partitionsNum) throws Exception {
if (keys == null || keys.isEmpty()) {
throw new Exception("keys 不能为空");
}
this.partitions = partitionsNum > 0 ? partitionsNum : keys.size();
for (int i = 0; i < keys.size(); i++) {
hashCodePartitionIndexMap.put(keys.get(i), i % this.partitions);
}
}
@Override
public int numPartitions() {
return this.partitions;
}
@Override
public int getPartition(Object key) {
return hashCodePartitionIndexMap.get(key.toString());
}
@Override
public boolean equals(Object obj) {
if (obj instanceof MyPartitioner) {
return ((MyPartitioner) obj).partitions == this.partitions;
}
return false;
}
@Override
public int hashCode() {
return this.partitions;
}
}
demo8
改造
//获得key集合(去重)
List<String> keys = pairRDD.map(new Function<Tuple2<String, Integer>, String>() {
public String call(Tuple2<String, Integer> v1) throws Exception {
return v1._1();
}
}).distinct().collect();
//自定义分区
JavaPairRDD<String, Integer> myPartitionRDD = pairRDD.partitionBy(new MyPartitioner(keys, 4)).cache();
System.out.println("partitionBy后分区数:" + myPartitionRDD.getNumPartitions());
System.out.println("partitionBy后分区划分器:" + myPartitionRDD.partitioner().toString());
结果输出:
初始分区数:2
初始分区划分器:Optional.empty
flatMapToPair后分区数:2
flatMapToPair后分区划分器:Optional.empty
partitionBy后分区数:4
partitionBy后分区划分器:Optional[com.yzy.spark.MyPartitioner@4]
结果显示,自定义分区已生效,接下来打印一下partitioner
的分区信息
Partitioner partitioner = myPartitionRDD.partitioner().get();
System.out.println("分区数:" + partitioner.numPartitions());
for (String k : keys) {
System.out.println(k + " 的分区ID:" + partitioner.getPartition(k));
}
结果输出:
分区数:4
bbb 的分区ID:0
ddd 的分区ID:1
fff 的分区ID:2
ggg 的分区ID:3
eee 的分区ID:0
ccc 的分区ID:1
aaa 的分区ID:2