spark—键值对操作

2018-11-07  本文已影响6人  Sophie12138

1.JavaPairRDD背景
键值对 RDD 通常用来进行聚合计算。先通过一些初始 ETL(抽取、转 化、装载)操作来将数据转化为键值对形式。键值对 RDD 提供了一些新的操作接口

让用户控制键值对 RDD 在各节点上分布情况的高级特性:分区。
使用可控的分区方式把常被一起访问的数据放到同一个节点上,可以大大减少应用的通信 开销。这会带来明显的性能提升。

Spark 为包含键值对类型的 RDD 提供了一些专有的操作,称为 pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。

通常从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段), 使用这些字段作为 pair RDD 操作中的键。

2.基本操作

rdd.flatMap(line -> Arrays.asList(line.split("")).iterator())
    .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
    .reduceByKey((s1, s2) -> s1 + s2)// 将相同的key进行reduce,并将value相加

combineByKey()的实现是一边进行aggregate,一边进行compute() 的基础操作。假设一组具有相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner将第一个 record 的 value 初始化为 c (比如,c = value),然后从第二个 record 开始,来一个 record 就使用 mergeValue(c, record.value) 来更新 c,如果想对这些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到 records 全部被 mergeValue(),得到结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来,combineByKey() 使用前面的方法不断计算得到 c'。现在如果要求这两组 records 总的 combineByKey() 后的结果,那么可以使用 final c = mergeCombiners(c, c') 来计算;然后依据partitioner进行不同分区合并。

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//转化为pairRDD
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(
                            new PairFunction<Integer, Integer, Integer>() {    
    @Override    
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {  
      return new Tuple2<Integer, Integer>(integer,1);   
  }
});

JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(
  new Function<Integer, String>() {    
    @Override    
    public String call(Integer v1) throws Exception {  
      return v1 + " :createCombiner: ";    
  }}, 
  new Function2<String, Integer, String>() {    
    @Override    
    public String call(String v1, Integer v2) throws Exception {        
      return v1 + " :mergeValue: " + v2;    
  }}, 
  new Function2<String, String, String>() {    
    @Override    
    public String call(String v1, String v2) throws Exception {        
      return v1 + " :mergeCombiners: " + v2;    
  }});

3.数据分组

4.数据连接

5.数据排序

6.pair RDD的行动操作

上一篇下一篇

猜你喜欢

热点阅读