MapReduce 分组 TopN(二次排序)
在Hadoop中,排序是MapReduce的灵魂,MapTask和ReduceTask均会对数据按Key排序,这个操作是MR框架的默认行为,不管你的业务逻辑上是否需要这一操作。
这篇主要讲 如何实现:当需要根据某一字段进行分组,并对每个分组求 排在前 N 个的值。而上篇没有涉及到分组,这是两篇最大的不同。
需求
如下面的数据:
grade score
A 10
A 40
B 30
C 20
B 10
D 40
A 30
C 20
B 10
D 40
C 30
D 20
...
grade 是年级,主要包括 A、B、C、D 四个年级;
score 是对应年级的分数。
我们需要根据年级进行分组,并统计每个年级下排在前 N 个的分数。
分析:
- 我们这篇是要利用 MapReduce 自带的排序功能,即根据 key 进行排序,那么我们把 grade、score 封装到一个对象中, compareTo 中根据 score 指定排序规则;
- 然后要自定义 partitioner,根据 grade 进行分区。这样每个相同 grade 的对象 就会分到 同一 reducer 中。
- 要自定义实现 groupingcomparator
下面对 GroupingComparator 做下介绍
GroupingComparator
在hadoop的mapreduce编程模型中,当在 map 端处理完成输出 key-value对时,reduce端只会将key相同的到同一个reduce函数中去执行,如果现在map端输出的key是一个对象 TextPair,那这样每个 map 端到 reduce 都会变成如下形式 (因为每个对象都不一样,所以不能聚合到一起):
<textPair01,1>
<textPair02,1>
<textPair03,1>
<textPair04,1>
...
但是我们又有这样的需求:根据 TextPair 的某一个成员A,所有具有相同 A 的TextPair 都放到一个 reducer 函数中处理,这个A 就相当于 “相同的Key”。我们可以通过 GroupingComparator 实现此功能。
套用我们的例子,即所有相同的 grade 的对象,都放到同一个 reducer 中处理,并取前 N 个值。
这里和 partitioner 做下区分,有些人可能混淆。
partitioner 是把 所有相同 grade 的对象放到一个 Reducer Task 中,但聚合还是要根据相同 key 的,而我们 每个对象都不一样,所以没办法聚合,所以要使用 GroupingComparator 。
上代码:
定义成绩信息bean
public class ScoreBean implements WritableComparable<ScoreBean>{
private Text grade;
private DoubleWritable score;
public ScoreBean() {
}
public ScoreBean(Text grade, DoubleWritable score) {
set(grade, score);
}
public void set(Text grade, DoubleWritable score) {
this.grade = grade;
this.score = score;
}
public Text getGrade() {
return grade;
}
public DoubleWritable getScore() {
return score;
}
@Override
public int compareTo(ScoreBean o) {
int cmp = this.grade.compareTo(o.getGrade());
if (cmp == 0) {
cmp = -this.score.compareTo(o.getScore());
}
return cmp;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(grade.toString());
out.writeDouble(score.get());
}
@Override
public void readFields(DataInput in) throws IOException {
String readUTF = in.readUTF();
double readDouble = in.readDouble();
this.grade = new Text(readUTF);
this.score= new DoubleWritable(readDouble);
}
@Override
public String toString() {
return grade.toString() + "\t" + score.get();
}
}
自定义partation分片:
public class GradePartitioner extends Partitioner<ScoreBean, NullWritable>{
@Override
public int getPartition(ScoreBean bean, NullWritable value, int numReduceTasks) {
//相同grade的成绩bean,会发往相同的partition
//而且,产生的分区数,是会跟用户设置的reduce task数保持一致
return (bean.getGrade().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
自定义groupingcomparator
public class GradeGroupingComparator extends WritableComparator {
protected GradeGroupingComparator() {
super(ScoreBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ScoreBean abean = (ScoreBean) a;
ScoreBean bbean = (ScoreBean) b;
//将grade相同的bean都视为相同,从而聚合为一组
return abean.getGrade().compareTo(bbean.getGrade());
}
}
编写mapreduce处理流程
public class SecondarySort {
static class SecondarySortMapper extends Mapper<LongWritable, Text, ScoreBean, NullWritable>{
ScoreBean bean = new ScoreBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<ScoreBean, NullWritable, ScoreBean, NullWritable>{
//在设置了groupingcomparator以后,这里收到的kv数据 就是: <1001 87.6>,null <1001 76.5>,null ....
//此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>
//要输出同一个grade的所有成绩中最大金额的那一个,就只要输出这个key
@Override
protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(ScoreBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定shuffle所使用的GroupingComparator类
job.setGroupingComparatorClass(GradeGroupingComparator.class);
//指定shuffle所使用的partitioner类
job.setPartitionerClass(GradePartitioner.class);
job.setNumReduceTasks(3);
job.waitForCompletion(true);
}
}