spark—键值对操作
1.JavaPairRDD背景
键值对 RDD 通常用来进行聚合计算。先通过一些初始 ETL(抽取、转 化、装载)操作来将数据转化为键值对形式。键值对 RDD 提供了一些新的操作接口
让用户控制键值对 RDD 在各节点上分布情况的高级特性:分区。
使用可控的分区方式把常被一起访问的数据放到同一个节点上,可以大大减少应用的通信 开销。这会带来明显的性能提升。
Spark 为包含键值对类型的 RDD 提供了一些专有的操作,称为 pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。
通常从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段), 使用这些字段作为 pair RDD 操作中的键。
2.基本操作
- filter:对pair RDD的元素按照某种规则进行过滤,去掉不需要的pair RDD
- reduceByKey:对pairRDD中的元素中所有相同key对应的元素进行统计,下面是单词个数统计的例子
rdd.flatMap(line -> Arrays.asList(line.split("")).iterator())
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.reduceByKey((s1, s2) -> s1 + s2)// 将相同的key进行reduce,并将value相加
- combineByKey:groupByKey、reduceByKey都是由combineByKey实现的。该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。此函数有三个参数必传:
createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
numPartitions:结果RDD分区数,不传则默认保持原有的分区数
partitioner:分区函数,不传则默认为HashPartitioner
mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,不传则默认为true
combineByKey()的实现是一边进行aggregate,一边进行compute() 的基础操作。假设一组具有相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner将第一个 record 的 value 初始化为 c (比如,c = value),然后从第二个 record 开始,来一个 record 就使用 mergeValue(c, record.value) 来更新 c,如果想对这些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到 records 全部被 mergeValue(),得到结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来,combineByKey() 使用前面的方法不断计算得到 c'。现在如果要求这两组 records 总的 combineByKey() 后的结果,那么可以使用 final c = mergeCombiners(c, c') 来计算;然后依据partitioner进行不同分区合并。
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//转化为pairRDD
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer,1);
}
});
JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(
new Function<Integer, String>() {
@Override
public String call(Integer v1) throws Exception {
return v1 + " :createCombiner: ";
}},
new Function2<String, Integer, String>() {
@Override
public String call(String v1, Integer v2) throws Exception {
return v1 + " :mergeValue: " + v2;
}},
new Function2<String, String, String>() {
@Override
public String call(String v1, String v2) throws Exception {
return v1 + " :mergeCombiners: " + v2;
}});
- 设置分区个数:每个RDD都有固定数目的分区,在建立RDD时,如果不指定分区个数的话,系统会根据集群大小推断出一个有意义的默认值。
3.数据分组
- groupByKey():对pair RDD中相同key对应的元素进行合并,形成一个元素列表作为value,key值保持不变。
4.数据连接
- join:对于2个pair RDD中都含有的key,设其value为v1,v2,返回一个pair RDD,其中的元素为(key, (v1,v2))。
- leftOuterJoin与rightOuterJoin:允许key值有缺失的连接,分别允许右边与左边的值有缺失,对应为None
5.数据排序
- sortByKey:按照key进行升序或者降序排列,可以指定比较key时采用的函数,即不是比较key1与key2,而是比较f(key1)与f(key2),但是f(key)的返回值类型需要与key相同。
6.pair RDD的行动操作
- countByKey:对每个key对应的元素个数进行计数
- collectAsMap:将结果以映射表的形式返回,便于查询
- lookup:返回给定键对应的所有值,为list的形式