大数据程序员技术文

MapReduce实现二次排序

2016-10-26  本文已影响3588人  心_的方向

二次排序的需求说明

在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。


sort1.png

测试的文件数据

a 1
a 5
a 7
a 9
b 3
b 8
b 10

未经过二次排序的输出结果

a   9
a   7
a   5
a   1
b   10
b   8
b   3

第一种实现思路

直接在reduce端对分组后的values进行排序。

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            
             List<Integer> valuesList = new ArrayList<Integer>();

             // 取出value
             for(IntWritable value : values) {
                 valuesList.add(value.get());
             }
             // 进行排序
             Collections.sort(valuesList);
            
             for(Integer value : valuesList) {
                context.write(key, new IntWritable(value));
             }
            
        }
a   1
a   5
a   7
a   9
b   3
b   8
b   10

很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。

第二种实现思路

将map端输出的<key,value>中的key和value组合成一个新的key(称为newKey),value值不变。这里就变成<(key,value),value>,在针对newKey排序的时候,如果key相同,就再对value进行排序。

  1. 自定义数据类型实现组合key
    实现方式:继承WritableComparable
  2. 自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。
    实现方式:继承partitioner
  3. 自动以分组,保持分组规则任然按照key进行。不打乱原来的分组
    实现方式:继承RawComparator
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

 public class PairWritable implements WritableComparable<PairWritable> {
    // 组合key
      private String first;
      private int second;

    public PairWritable() {
    }

    public PairWritable(String first, int second) {
        this.set(first, second);
    }

    /**
     * 方便设置字段
     */
    public void set(String first, int second) {
        this.first = first;
        this.second = second;
    }
    
    /**
     * 反序列化
     */
    @Override
    public void readFields(DataInput arg0) throws IOException {
        this.first = arg0.readUTF();
        this.second = arg0.readInt();
    }
    /**
     * 序列化
     */
    @Override
    public void write(DataOutput arg0) throws IOException {
        arg0.writeUTF(first);
        arg0.writeInt(second);
    }

    /*
     * 重写比较器
     */
    public int compareTo(PairWritable o) {
        int comp = this.first.compareTo(o.first);
        
        if(comp != 0) {
            return comp;
        } else { // 若第一个字段相等,则比较第二个字段
            return Integer.valueOf(this.second).compareTo(
                    Integer.valueOf(o.getSecond()));
        }
    }
    
    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    public String getFirst() {
        return first;
    }
    public void setFirst(String first) {
        this.first = first;
    }
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {

    @Override
    public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
        /* 
         * 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
         * 让key中first字段作为分区依据
         */
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; 
    }
}

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class SecondGroupComparator implements RawComparator<PairWritable> {

    /*
     * 对象比较
     */
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }

    /*
     * 字节比较
     * arg0,arg3为要比较的两个字节数组
     * arg1,arg2表示第一个字节数组要进行比较的收尾位置,arg4,arg5表示第二个
     * 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
     */
    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
        return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
    }
}
        private PairWritable mapOutKey = new PairWritable();
        private IntWritable mapOutValue = new IntWritable();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");
            
            //设置组合key和value ==> <(key,value),value>
            mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
            mapOutValue.set(Integer.valueOf(strs[1]));
            
            context.write(mapOutKey, mapOutValue);
        }
        private Text outPutKey = new Text(); 
        public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            //迭代输出
            for(IntWritable value : values) {
                outPutKey.set(key.getFirst());
                context.write(outPutKey, value);
            }
            
        }
a   1
a   5
a   7
a   9
b   3
b   8
b   10
上一篇下一篇

猜你喜欢

热点阅读