MapReduce开发笔记(五、Topk问题)
2020-09-07 本文已影响0人
眼君
处理海量数据有时候需要对数据集进行排序,一般排序算法复杂度是n的平方,如果处理海量数据时会有问题,需要优化。
类似的SQL用法
在某种情况下,我们不需要对全数据集排序,而只需获取最大(小)的前K个数据,就是topK问题,类似SQL的用法:
SELECT * FROM table ORDER BY col4 DESC LIMIT 10;
MR程序
实际上这种场景下,我们不需要要对全量数据排序,所以在map阶段可以处理掉大部分数据,减少Map和Reducer的网络IO。
使用JAVATreeMap自动排序的特点来处理topK排序问题:
package com.wenhuan.MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.TreeMap;
public class TopKAPP {
public static int K = 3;
public static class TopKMapperr extends Mapper<LongWritable, Text, NullWritable,LongWritable>{
private TreeMap<Long, Long> tree = new TreeMap<Long,Long>();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
long temp = Long.parseLong(value.toString());
tree.put(temp,temp);
if (tree.size() > K){
tree.remove(tree.firstKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Long text:tree.values()){
context.write(NullWritable.get(),new LongWritable(text));
}
}
}
public static class TopKReducer extends Reducer<NullWritable,LongWritable,NullWritable,LongWritable>{
private TreeMap<Long, Long> tree = new TreeMap<Long, Long>();
@Override
protected void reduce(NullWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable value:values){
tree.put(value.get(),value.get());
if (tree.size() > K){
tree.remove(tree.firstKey());
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Long val:tree.descendingKeySet()){
context.write(NullWritable.get(),new LongWritable(val));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)){
fileSystem.delete(outputPath,true);
System.out.println("output file exists, but is has deleted");
}
Job job = Job.getInstance(configuration,"topK");
job.setJarByClass(TopKAPP.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setMapperClass(TopKMapperr.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(TopKReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
有时我们需要对数据集进行归并统计,然后取topK,我们用SQL一般是采用开窗函数处理,但是当处理海量数据时会有数据倾斜的风险,这时我们可以考虑在map阶段先归并统计。