大数据,机器学习,人工智能大数据 爬虫Python AI Sql

MapReduce对文件的二次排序操作(三)

2018-10-08  本文已影响10人  小飞牛_666
所谓的二次排序就是对文件中先对第一个字段排序,如果第一个字段相同,则根据第一个字段再对第二个字段进行排序(即先根据键排序,然后在根据相同的键对其值进行排序),先看最基础的效果图就明白了:
image.png
一、自定义一个实现 WritableComparable 接口的类型,用于对数据的排序:
public class SortWritable implements WritableComparable<SortWritable> {

    //分别代表第一个字段和第二个字段
    private String first;
    private int second;

    public SortWritable() {
    }

    public SortWritable(String first, int second) {
        this.set(first,second);
    }

    //为方便调用我们创建一个方法
    public void set(String first, int second){
        this.first = first;
        this.second = second;
    }

    public String getFirst() {
        return first;
    }

    public void setFirst(String first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }

    //先根据第一个字段比较排序,如果相同在根据第二个比较排序
    public int compareTo(SortWritable o) {
        int comp = this.getFirst().compareTo(o.getFirst());
        if(0 == comp){
            return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond()));
        }
        return comp;
    }

    //序列化
    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeUTF(first);
        dataOutput.writeInt(second);

    }

    //反序列化
    public void readFields(DataInput dataInput) throws IOException {

        this.first = dataInput.readUTF();
        this.second = dataInput.readInt();

    }

    //一下三个方法都是快捷生成
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SortWritable that = (SortWritable) o;
        if(second != that.second) return false;
        return first != null ? first.equals(that.first) : that.first == null;
    }

    @Override
    public int hashCode() {
        int result = first != null ? first.hashCode() : 0;
        result = 31 * result + second;
        return result;
    }

    @Override
    public String toString() {
        return "SortWritable{" +
                "first='" + first + '\'' +
                ", second=" + second +
                '}';
    }

}

二、创建驱动类基础框架:
public class MySecondSortMR extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        //驱动
        //1) 获取 Configuration
        Configuration configuration = this.getConf();

        //2) 创建 job
        Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        //3.1) 输入
        Path inputPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputPath);

        //3.2 设置 map
        job.setMapperClass(SecondMapper.class);
        job.setMapOutputKeyClass(SortWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        //1.分区
        //job.setPartitionerClass(FirstPartitioner.class);

        //2.压缩
        //configuration.set("mapreduce.map.output.compress","true");
        //configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

        //3.分组
        //job.setGroupingComparatorClass(FirstGrouping.class);

        //3.3 设置 reduce
        job.setReducerClass(SecondReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置 reduce 的任务个数
        //job.setNumReduceTasks(2);

        //3.4 设置输出
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //4.提交
        boolean sucess = job.waitForCompletion(true);
        return sucess ? 0 : 1;
    }

    public static void main(String[] args) {

        //当打包成 jar 之前 记得注释掉
        args = new String[]{
                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/secondsort.txt",
                "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output9"
        };

        Configuration configuration = new Configuration();

        try {
            //先判断文件夹是否存在
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);

            if(fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath, true); //删除
            }

            int status = ToolRunner.run(configuration, new MySecondSortMR(), args);
            System.exit(status);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

三、创建一个Mapper的子类,用于对数据的切分及逻辑的的操作(这里值得注意的是输出的键是我们自定义的类型SortWritable):
public static class SecondMapper extends Mapper<LongWritable, Text, SortWritable, IntWritable>{

        private SortWritable outputKey = new SortWritable();
        private IntWritable outputValue = new IntWritable();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] values = value.toString().split(" ");

            if(2 != values.length) return;

            outputKey.set(values[0], Integer.valueOf(values[1]));
            outputValue.set(Integer.valueOf(values[1]));
            context.write(outputKey, outputValue);
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }
    }

四、创建 Reducer 的子类(数据类型的输入要和mapper的输出类型要一致):
public static class SecondReduce extends Reducer<SortWritable, IntWritable, Text, IntWritable>{

        private Text outputKey = new Text();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void reduce(SortWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            for(IntWritable value : values){
                outputKey.set(key.getFirst());
                context.write(outputKey, value);
            }

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }
    }

五、运行程序如果没有问题的话即可直接在网页 /user/hdfs/output9 查看生成的数据,接下来我们使用命令查看排序的结果:
bin/hdfs dfs -text /user/hdfs/output9/part*

效果图如下:


image.png
六、为达到优化效果,我们可做如下设置:
image.png

由于键值的组合,为保原有的分区与分组原有的结构,我们需要去自定义分区与分组类。

七、自定义一个实现RawComparator接口的分组类:
public class FirstGrouping implements RawComparator<SortWritable> {

    //通过字节数组进行对比
    public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {
        //int有四个字节,因此从 0 开始 到 i - 4
        return WritableComparator.compareBytes(bytes1,0,i1 - 4,bytes2,0,i3-4);
    }

    //通过对象进行对比
    public int compare(SortWritable o1, SortWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }

}

八、自定义一个继承自Partitioner的分区类:
public class FirstPartitioner extends Partitioner<SortWritable, IntWritable> {

    public int getPartition(SortWritable key, IntWritable intWritable, int i) {
        return (key.getFirst().hashCode() & 2147483647) % i;
    }

}

接下来我们我们再去掉驱动类的 run() 方法中的 分组和分区的注释语句再运行程序,同样得到我们所需要的效果。。。


image.png

感谢老师与各位大神的指点,感恩一切。。。

上一篇下一篇

猜你喜欢

热点阅读