眼君的大数据之路

MapReduce开发笔记(五、Topk问题)

2020-09-07  本文已影响0人  眼君

处理海量数据有时候需要对数据集进行排序,一般排序算法复杂度是n的平方,如果处理海量数据时会有问题,需要优化。

类似的SQL用法

在某种情况下,我们不需要对全数据集排序,而只需获取最大(小)的前K个数据,就是topK问题,类似SQL的用法:

SELECT * FROM table ORDER BY col4 DESC LIMIT 10;

MR程序

实际上这种场景下,我们不需要要对全量数据排序,所以在map阶段可以处理掉大部分数据,减少Map和Reducer的网络IO。

使用JAVATreeMap自动排序的特点来处理topK排序问题:

package com.wenhuan.MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

public class TopKAPP {
    public static int K = 3;

    public static class TopKMapperr extends Mapper<LongWritable, Text, NullWritable,LongWritable>{
        private TreeMap<Long, Long> tree = new TreeMap<Long,Long>();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            long temp = Long.parseLong(value.toString());
            tree.put(temp,temp);
            if (tree.size() > K){
                tree.remove(tree.firstKey());
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Long text:tree.values()){
                context.write(NullWritable.get(),new LongWritable(text));
            }
        }
    }

    public static class TopKReducer extends Reducer<NullWritable,LongWritable,NullWritable,LongWritable>{
        private TreeMap<Long, Long> tree = new TreeMap<Long, Long>();

        @Override
        protected void reduce(NullWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            for (LongWritable value:values){
                tree.put(value.get(),value.get());
                if (tree.size() > K){
                    tree.remove(tree.firstKey());
                }
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Long val:tree.descendingKeySet()){
                context.write(NullWritable.get(),new LongWritable(val));
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();

        Path outputPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(configuration);
        if (fileSystem.exists(outputPath)){
            fileSystem.delete(outputPath,true);
            System.out.println("output file exists, but is has deleted");
        }

        Job job = Job.getInstance(configuration,"topK");
        job.setJarByClass(TopKAPP.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        job.setMapperClass(TopKMapperr.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(TopKReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(LongWritable.class);

        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0:1);
    }
}

有时我们需要对数据集进行归并统计,然后取topK,我们用SQL一般是采用开窗函数处理,但是当处理海量数据时会有数据倾斜的风险,这时我们可以考虑在map阶段先归并统计。

上一篇下一篇

猜你喜欢

热点阅读