4. Hadoop:二次排序实现
一、二次排序介绍及原理
假设有如下一组数据:
1 1
3 3
2 2
1 5
1 3
2 1
现在需要 MapReduce 程序先对其第一个字段排序,再对第二个字段进行排序。最终会得到如下结果:
1 1
1 3
1 5
2 1
2 2
3 3
这就是二次排序的含义。
我们知道,MapReduce 程序中,Mapper 输出的键值对会经历 shuffle 过程再交给 Reducer。在 shuffle 阶段,Mapper 输出的键值对会经过 partition(分区)->sort(排序)->group(分组) 三个阶段。
既然 MapReduce 框架自带排序,那么二次排序中的排序是否可以交给 shuffle 中的 sort 来执行呢?
答案是肯定的,而且 shuffle 的 sort 在大量数据的情况下具有很高的效率。
shuffle 的 sort 过程会根据键值对<key, value>的 key 进行排序,但是二次排序中,value 也是需要排序的字段。因此需要将 value 字段合并到 key 中作为新的 key,形成新的键值对<key#value, value>。在排序时使其先根据 key 排序,如果相同,再根据 value 排序。
下图详细描述了二次排序的流程,虚线框内为 shuffle 过程:
![](https://img.haomeiwen.com/i2033184/fc7c31a7ee131efc.png)
可以发现,在 Map 阶段,程序将原本的 key 和 value 组合成一个新的 key,这是一个新的数据类型。因此需要在程序中定义该数据类型,为了使它支持排序,还需定义它的比较方式。根据二次排序的规则,比较方式为:先比较第一个字段,如果相同再比较第二个字段。
Map 操作结束后,数据的 key 已经改变,但是分区依旧要按照原本的 key 进行,否则原 key 不同的数据会被分到一起。举个例子,<1#1, 1>、<1#3, 3>、<1#5, 5>应该属于同一个分区,因为它们的原 key 都是1。如果按照新 key 进行分区,那么可能<1#1, 1>、<1#3, 3>在同一个分区,<1#5, 5>被分到另一个分区。
分区和排序结束后进行分组,很显然分组也是按照原 key 进行。
二、代码实现
1. 自定义新 key 的数据类型
定义类 PairWritable 作为新 key 的数据类型,Hadoop 的数据类型需要实现 WritableComparable<> 接口。该接口中的 readFields() 和 write() 方法用于实现数据的读写,compareTo() 方法用于实现该数据类型的比较功能以便排序。
public class PairWritable
implements WritableComparable<PairWritable>{
private int first;
private int second;
/**
* 构造函数
*/
public PairWritable() { }
public void set(int first, int second) {
this.first = first;
this.second = second;
}
/**
* Getters and Setters
*/
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public void readFields(DataInput in) throws IOException {
this.setFirst(in.readInt());
this.setSecond(in.readInt());
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.getFirst());
out.writeInt(this.getSecond());
}
public int compareTo(PairWritable o) {
// 先比较 first
int compare = Integer.valueOf(this.getFirst()).compareTo(o.getFirst());
if (compare != 0) {
return compare;
}
// first 相同再比较 second
return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PairWritable other = (PairWritable) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}
@Override
public String toString() {
return first + "\t" + second;
}
}
2. 自定义分区类
定义 FirstPartitioner 作为分区类,按照 PairWritable 的第一个字段分区。
public class FirstPartitioner
extends Partitioner<PairWritable, IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value,
int numPartitions) {
return (String.valueOf(key.getFirst()).hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
3. 自定义分组类
定义 FirstGroupingComparator 作为分组类,按照 PairWritable 的第一个子段进行分组。
public class FirstGroupingComparator
implements RawComparator<PairWritable>{
public int compare(PairWritable o1, PairWritable o2) {
return Integer.valueOf(o1.getFirst()).compareTo(o2.getFirst());
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
}
}
4. 主类 SecondarySort
二次排序的 Mapper 需要将原 key 和 value 组合成新 key 再输出;Reducer 不需要额外的工作,将 shuffle 的结果遍历输出即可。
在主函数中,需要为 job 执指定分区类和分组类为我们自定义的 FirstPartitioner 和 FirstGroupingComparator。
public class SecondarySort {
public static class SecondarySortMapper extends Mapper<LongWritable, Text, PairWritable, IntWritable> {
private PairWritable mapOutputKey = new PairWritable();
private IntWritable mapOutputValue = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] strs = line.split(" ");
mapOutputKey.set(Integer.valueOf(strs[0]), Integer.valueOf(strs[1]));
mapOutputValue.set(Integer.valueOf(strs[1]));
context.write(mapOutputKey, mapOutputValue);
}
}
public static class SecondarySortReducer extends Reducer<PairWritable, IntWritable, IntWritable, IntWritable> {
private IntWritable outputKey = new IntWritable();
@Override
protected void reduce(PairWritable key, Iterable<IntWritable> values,
Reducer<PairWritable, IntWritable, IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
for (IntWritable value : values) {
outputKey.set(key.getFirst());
context.write(outputKey, value);
}
}
}
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "secondarysort");
job.setJarByClass(SecondarySort.class);
job.setMapOutputKeyClass(PairWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(SecondarySortMapper.class);
// shuffle settings
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(SecondarySortReducer.class);
FileInputFormat.addInputPath(job, new Path("/datas/test/mapred/secondarysort"));
FileOutputFormat.setOutputPath(job, new Path("/datas/test/mapred/result4-19"));
boolean isSuccess = job.waitForCompletion(true);
System.exit(isSuccess ? 0 : 1);
}
}