java学习Java学习

4. Hadoop:二次排序实现

2018-04-19  本文已影响97人  东方未曦

一、二次排序介绍及原理

假设有如下一组数据:

1   1
3   3
2   2
1   5
1   3
2   1

现在需要 MapReduce 程序先对其第一个字段排序,再对第二个字段进行排序。最终会得到如下结果:

1   1
1   3
1   5
2   1
2   2
3   3

这就是二次排序的含义。

我们知道,MapReduce 程序中,Mapper 输出的键值对会经历 shuffle 过程再交给 Reducer。在 shuffle 阶段,Mapper 输出的键值对会经过 partition(分区)->sort(排序)->group(分组) 三个阶段。

既然 MapReduce 框架自带排序,那么二次排序中的排序是否可以交给 shuffle 中的 sort 来执行呢?
答案是肯定的,而且 shuffle 的 sort 在大量数据的情况下具有很高的效率。

shuffle 的 sort 过程会根据键值对<key, value>的 key 进行排序,但是二次排序中,value 也是需要排序的字段。因此需要将 value 字段合并到 key 中作为新的 key,形成新的键值对<key#value, value>。在排序时使其先根据 key 排序,如果相同,再根据 value 排序。

下图详细描述了二次排序的流程,虚线框内为 shuffle 过程:

secondary_sort 流程.png

可以发现,在 Map 阶段,程序将原本的 key 和 value 组合成一个新的 key,这是一个新的数据类型。因此需要在程序中定义该数据类型,为了使它支持排序,还需定义它的比较方式。根据二次排序的规则,比较方式为:先比较第一个字段,如果相同再比较第二个字段。

Map 操作结束后,数据的 key 已经改变,但是分区依旧要按照原本的 key 进行,否则原 key 不同的数据会被分到一起。举个例子,<1#1, 1>、<1#3, 3>、<1#5, 5>应该属于同一个分区,因为它们的原 key 都是1。如果按照新 key 进行分区,那么可能<1#1, 1>、<1#3, 3>在同一个分区,<1#5, 5>被分到另一个分区。

分区和排序结束后进行分组,很显然分组也是按照原 key 进行。

二、代码实现

1. 自定义新 key 的数据类型

定义类 PairWritable 作为新 key 的数据类型,Hadoop 的数据类型需要实现 WritableComparable<> 接口。该接口中的 readFields() 和 write() 方法用于实现数据的读写,compareTo() 方法用于实现该数据类型的比较功能以便排序。

public class PairWritable 
    implements WritableComparable<PairWritable>{
    
    private int first;
    private int second;
    
    /**
     * 构造函数
     */
    public PairWritable() { }
    
    public void set(int first, int second) {
        this.first = first;
        this.second = second;
    }
    
    /**
     * Getters and Setters
     */
    public int getFirst() {
        return first;
    }
    public void setFirst(int first) {
        this.first = first;
    }
    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    
    public void readFields(DataInput in) throws IOException {
        this.setFirst(in.readInt());
        this.setSecond(in.readInt());
    }
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.getFirst());
        out.writeInt(this.getSecond());
    }
    public int compareTo(PairWritable o) {
        // 先比较 first
        int compare = Integer.valueOf(this.getFirst()).compareTo(o.getFirst());
        if (compare != 0) {
            return compare;
        }
        // first 相同再比较 second
        return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + first;
        result = prime * result + second;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PairWritable other = (PairWritable) obj;
        if (first != other.first)
            return false;
        if (second != other.second)
            return false;
        return true;
    }

    @Override
    public String toString() {
        return first + "\t" + second;
    }

}
2. 自定义分区类

定义 FirstPartitioner 作为分区类,按照 PairWritable 的第一个字段分区。

public class FirstPartitioner 
    extends Partitioner<PairWritable, IntWritable> {

    @Override
    public int getPartition(PairWritable key, IntWritable value, 
            int numPartitions) {
        return (String.valueOf(key.getFirst()).hashCode() & Integer.MAX_VALUE) % numPartitions;
    }

}
3. 自定义分组类

定义 FirstGroupingComparator 作为分组类,按照 PairWritable 的第一个子段进行分组。

public class FirstGroupingComparator 
    implements RawComparator<PairWritable>{

    public int compare(PairWritable o1, PairWritable o2) {
        return Integer.valueOf(o1.getFirst()).compareTo(o2.getFirst());
    }

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
    }

}
4. 主类 SecondarySort

二次排序的 Mapper 需要将原 key 和 value 组合成新 key 再输出;Reducer 不需要额外的工作,将 shuffle 的结果遍历输出即可。
在主函数中,需要为 job 执指定分区类和分组类为我们自定义的 FirstPartitioner 和 FirstGroupingComparator。

public class SecondarySort {

    public static class SecondarySortMapper extends Mapper<LongWritable, Text, PairWritable, IntWritable> {

        private PairWritable mapOutputKey = new PairWritable();
        private IntWritable mapOutputValue = new IntWritable();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairWritable, IntWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] strs = line.split(" ");
            mapOutputKey.set(Integer.valueOf(strs[0]), Integer.valueOf(strs[1]));
            mapOutputValue.set(Integer.valueOf(strs[1]));
            context.write(mapOutputKey, mapOutputValue);
        }
    }

    public static class SecondarySortReducer extends Reducer<PairWritable, IntWritable, IntWritable, IntWritable> {

        private IntWritable outputKey = new IntWritable();
        
        @Override
        protected void reduce(PairWritable key, Iterable<IntWritable> values,
                Reducer<PairWritable, IntWritable, IntWritable, IntWritable>.Context context)
                throws IOException, InterruptedException {
            for (IntWritable value : values) {
                outputKey.set(key.getFirst());
                context.write(outputKey, value);
            }
        }
    }

    public static void main(String[] args) 
            throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        
        job.setMapOutputKeyClass(PairWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setMapperClass(SecondarySortMapper.class);
        
        // shuffle settings
        job.setPartitionerClass(FirstPartitioner.class);
        job.setGroupingComparatorClass(FirstGroupingComparator.class);
        
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        job.setReducerClass(SecondarySortReducer.class);
        
        FileInputFormat.addInputPath(job, new Path("/datas/test/mapred/secondarysort"));
        FileOutputFormat.setOutputPath(job, new Path("/datas/test/mapred/result4-19"));
        
        boolean isSuccess = job.waitForCompletion(true);
        System.exit(isSuccess ? 0 : 1);
    }

}
上一篇 下一篇

猜你喜欢

热点阅读