MapReducer之Combiner(归约处理)

2018-04-05  本文已影响0人  末央酒

Commbiner相当于本地的Reducer计算模式,但是并不是所有场合都适合,总结一下都是什么场合适合用。


作用

因为Map产生了太多的输出,为了减少RPC传输,在本地进行一次类似于Reduce操作,进行累加,再将累加的值传给Reduce。

注意:因为Combiner是可插拔的,所以添加Combiner不能影响最终的计算机过,Combiner应该适用于那些,Reduce输入和输出key/value类型完全一致的,且不影响最终结果的。

WordCount实例

public class TestCombinerForAvgMR {

//Map对不同文件不同月份进行统计

    public static class ForMapextends Mapper {

Textokey =new Text();

AvgEntityavgEntity =new AvgEntity();

@Override

        protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

String []line = value.toString().split(" ");

okey.set(line[0]);

avgEntity.setCount(1);

avgEntity.setSum(Integer.parseInt(line[1]));

context.write(okey,avgEntity);

}

}

//Combiner对每个月份的进行累加

    public static class ForCombinextends Reducer {

@Override

        protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {

int sum =0;

int count =0;

for (AvgEntity avgEntity:values){

sum += avgEntity.getSum();

count += avgEntity.getCount();

}

AvgEntity avgEntity =new AvgEntity();

avgEntity.setSum(sum);

avgEntity.setCount(count);

context.write(key,avgEntity);

}

}

//将月份合并进行累加,做除法

    public static class ForReduceextends Reducer{

@Override

        protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {

int sum =0;

int count =0;

for(AvgEntity avgEntity : values){

sum += avgEntity.getCount();

count += avgEntity.getSum();

}

context.write(key,new IntWritable(sum/count));

}

}

public static void main(String[] args)throws IOException, ClassNotFoundException, InterruptedException {

Job job = Job.getInstance(new Configuration());

job.setMapperClass(ForMap.class);

job.setReducerClass(ForReduce.class);

job.setCombinerClass(ForCombin.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(AvgEntity.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job,new Path("目录"));

FileOutputFormat.setOutputPath(job,new Path("目录"));

job.waitForCompletion(true);

    }

问题总结

1.为什么需要在Mapper端进行归约处理

    因为在Mapper进行归约后,数据量变小了,这样再通过网络传输时,传输时间就变短了,减少了整个作业的运行时间。

2.为什么可以在Mapper端进行归约处理

    因为Reducer端接收的数据就是来自于Mapper端。我们在Mapper进行归约处理,无非就是把归约操作提前到Mapper端做而已。

3.既然在Mapper端进行了归约处理,为什么还要在Reducer端进行处理。

    因为Mapper端只处理了本节点的数据,而Reduce端处理的是来自多个Mapper端的数据,因此有些在Mapper端不能归约的数据,在Reducer端可以进行归约。

4.求平均数(SVG)的非关联操作场景如何减少I/O传输量

    更改Mapper端使其输出两列数据分别是数值个数count和平均数avg,这样在Reducer端累加count作为总的数值个数,输出计数和平均值。

上一篇下一篇

猜你喜欢

热点阅读