我爱编程

MapReduce 分组 TopN(二次排序)

2018-06-21  本文已影响0人  博弈史密斯

在Hadoop中,排序是MapReduce的灵魂,MapTask和ReduceTask均会对数据按Key排序,这个操作是MR框架的默认行为,不管你的业务逻辑上是否需要这一操作。

这篇主要讲 如何实现:当需要根据某一字段进行分组,并对每个分组求 排在前 N 个的值。而上篇没有涉及到分组,这是两篇最大的不同。

需求

如下面的数据:

grade   score  
A       10  
A       40  
B       30  
C       20  
B       10  
D       40  
A       30  
C       20  
B       10  
D       40  
C       30  
D       20  
...

grade 是年级,主要包括 A、B、C、D 四个年级;
score 是对应年级的分数。

我们需要根据年级进行分组,并统计每个年级下排在前 N 个的分数。

分析:

  1. 我们这篇是要利用 MapReduce 自带的排序功能,即根据 key 进行排序,那么我们把 grade、score 封装到一个对象中, compareTo 中根据 score 指定排序规则;
  2. 然后要自定义 partitioner,根据 grade 进行分区。这样每个相同 grade 的对象 就会分到 同一 reducer 中。
  3. 要自定义实现 groupingcomparator

下面对 GroupingComparator 做下介绍

GroupingComparator

在hadoop的mapreduce编程模型中,当在 map 端处理完成输出 key-value对时,reduce端只会将key相同的到同一个reduce函数中去执行,如果现在map端输出的key是一个对象 TextPair,那这样每个 map 端到 reduce 都会变成如下形式 (因为每个对象都不一样,所以不能聚合到一起):

<textPair01,1>
<textPair02,1>
<textPair03,1>
<textPair04,1>
...

但是我们又有这样的需求:根据 TextPair 的某一个成员A,所有具有相同 A 的TextPair 都放到一个 reducer 函数中处理,这个A 就相当于 “相同的Key”。我们可以通过 GroupingComparator 实现此功能。

套用我们的例子,即所有相同的 grade 的对象,都放到同一个 reducer 中处理,并取前 N 个值。

这里和 partitioner 做下区分,有些人可能混淆。
partitioner 是把 所有相同 grade 的对象放到一个 Reducer Task 中,但聚合还是要根据相同 key 的,而我们 每个对象都不一样,所以没办法聚合,所以要使用 GroupingComparator 。

上代码:

定义成绩信息bean

public class ScoreBean implements WritableComparable<ScoreBean>{  
    private Text grade;  
    private DoubleWritable score;  
  
    public ScoreBean() {  
    }  
    public ScoreBean(Text grade, DoubleWritable score) {  
        set(grade, score);  
    }  
  
    public void set(Text grade, DoubleWritable score) {  
  
        this.grade = grade;  
        this.score = score;  
  
    }  
  
    public Text getGrade() {  
        return grade;  
    }  
  
    public DoubleWritable getScore() {  
        return score;  
    }  
  
    @Override  
    public int compareTo(ScoreBean o) {  
        int cmp = this.grade.compareTo(o.getGrade());  
        if (cmp == 0) {  
  
            cmp = -this.score.compareTo(o.getScore());  
        }  
        return cmp;  
    }  
  
    @Override  
    public void write(DataOutput out) throws IOException {  
        out.writeUTF(grade.toString());  
        out.writeDouble(score.get());  
          
    }  
  
    @Override  
    public void readFields(DataInput in) throws IOException {  
        String readUTF = in.readUTF();  
        double readDouble = in.readDouble();  
          
        this.grade = new Text(readUTF);  
        this.score= new DoubleWritable(readDouble);  
    }  
  
  
    @Override  
    public String toString() {  
        return grade.toString() + "\t" + score.get();  
    }  
}  

自定义partation分片:

public class GradePartitioner extends Partitioner<ScoreBean, NullWritable>{  
  
    @Override  
    public int getPartition(ScoreBean bean, NullWritable value, int numReduceTasks) {  
        //相同grade的成绩bean,会发往相同的partition  
        //而且,产生的分区数,是会跟用户设置的reduce task数保持一致  
        return (bean.getGrade().hashCode() & Integer.MAX_VALUE) % numReduceTasks;    
    }  
}  

自定义groupingcomparator

public class GradeGroupingComparator extends WritableComparator {  
  
    protected GradeGroupingComparator() {  
  
        super(ScoreBean.class, true);  
    }  
      
    @Override  
    public int compare(WritableComparable a, WritableComparable b) {  
        ScoreBean abean = (ScoreBean) a;  
        ScoreBean bbean = (ScoreBean) b;  
          
        //将grade相同的bean都视为相同,从而聚合为一组  
        return abean.getGrade().compareTo(bbean.getGrade());  
    }  
} 

编写mapreduce处理流程

public class SecondarySort {  
      
    static class SecondarySortMapper extends Mapper<LongWritable, Text, ScoreBean, NullWritable>{  
          
        ScoreBean bean = new ScoreBean();  
          
        @Override  
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  
            String line = value.toString();  
            String[] fields = StringUtils.split(line, "\t");  
              
            bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));  
              
            context.write(bean, NullWritable.get());  
              
        }  
          
    }  
      
    static class SecondarySortReducer extends Reducer<ScoreBean, NullWritable, ScoreBean, NullWritable>{  
          
          
        //在设置了groupingcomparator以后,这里收到的kv数据 就是:  <1001 87.6>,null  <1001 76.5>,null  ....   
        //此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>  
        //要输出同一个grade的所有成绩中最大金额的那一个,就只要输出这个key  
        @Override  
        protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {  
            context.write(key, NullWritable.get());  
        }  
    }  
      
      
    public static void main(String[] args) throws Exception {  
          
        Configuration conf = new Configuration();  
        Job job = Job.getInstance(conf);  
          
        job.setJarByClass(SecondarySort.class);  
          
        job.setMapperClass(SecondarySortMapper.class);  
        job.setReducerClass(SecondarySortReducer.class);  
          
          
        job.setOutputKeyClass(ScoreBean.class);  
        job.setOutputValueClass(NullWritable.class);  
          
        FileInputFormat.setInputPaths(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        //指定shuffle所使用的GroupingComparator类  
        job.setGroupingComparatorClass(GradeGroupingComparator.class);  
        //指定shuffle所使用的partitioner类  
        job.setPartitionerClass(GradePartitioner.class);  
          
        job.setNumReduceTasks(3);  
          
        job.waitForCompletion(true);  
    }  
}  
上一篇下一篇

猜你喜欢

热点阅读