Hadoop学习之Combiner 合并
2021-06-11 本文已影响0人
万事万物
介绍:
- Combiner 是MR程序中Mapper和Reducer之外的一种组件。
- Combiner 组件的父类就是Reducer
- Combiner 和 Reducer 区别?
- Combiner 运行在 Mapper 阶段,是用于统计 的数据集。
- Reducer 运行在 Reducer 阶段,是用于统计 的数据集。
- Combiner 作用就是针对每个MapTask的输出进行汇总,减少网络带宽。
- 在MapReduce中,默认不使用 Combiner,需要自己去声明。
使用与不使用Combiner 效果图
默认是不使用Combiner 的,意味着 每个MapTask 到 Reduce 需要经过三万次网络传输。效率上肯定会有所影响。使用Combiner 可以很好的解决这个问题,减少网络带宽,可官方并不推荐使用Combiner ,因为Combiner 统计的结果可能与 Reduce最终的结果在下会有问题,所以不推荐。
不使用Combiner 的情况
使用 Combiner 减少网络带宽,提高效率。使用Combiner 之后,每个MapTask 从三万次的传输降低成了3次(a=10000,b=10000,c=10000)。然后 Reduce 在统计每个MapTask 传输过来的数据集结果。
使用Combiner 的情况
自定义实现Combiner
需求分析;
使用Combiner 实现wordCount 功能,统计单词个数。
统计 a b c 的个数。
单词文件内容
a b c a b c a b c a b c
a b c a b c a b c a b c
代码实现
mapper
public class PhoneCombinerMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
private LongWritable outValue = new LongWritable(1);
private Text outKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lien = value.toString();
// 按照 空格分割
List<String> list = Stream.of(lien.split(" ")).filter(s -> s.length() > 0).collect(Collectors.toList());
for (String str : list) {
outKey.set(str);
context.write(outKey,outValue);
}
}
}
reducer
public class PhoneCombinerReduce extends Reducer<Text, LongWritable,Text,LongWritable> {
private LongWritable outValue = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Iterator<LongWritable> iterator = values.iterator();
long sum = 0;
while (iterator.hasNext()) {
sum += iterator.next().get();
}
outValue.set(sum);
context.write(key,outValue);
}
}
combiner;其实这里的内容和reducer的内容是一致的
public class PhoneCombinerCombiner extends Reducer<Text,LongWritable,Text,LongWritable> {
private LongWritable outValue = new LongWritable();
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Iterator<LongWritable> iterator = values.iterator();
long sum = 0;
while (iterator.hasNext()) {
sum += iterator.next().get();
}
outValue.set(sum);
context.write(key,outValue);
}
}
main
public class PhoneCombinerDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setMapperClass(PhoneCombinerMapper.class);
job.setReducerClass(PhoneCombinerReduce.class);
// 配置Combiner
job.setCombinerClass(PhoneCombinerCombiner.class);
// Combiner 是 Reduce 的子类,若实现方式一致,可以直接使用 Reduce。
// job.setCombinerClass(PhoneCombinerReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\development tool\\document\\temp\\input\\wordcount\\input5.txt"));
String str = UUID.randomUUID().toString().replaceAll("-", "");
FileOutputFormat.setOutputPath(job, new Path("D:\\development tool\\document\\temp\\output", str));
job.waitForCompletion(true);
}
}
运行调试:
查看运行结果,以 a为例
PhoneCombinerCombiner的调试结果:while 循环次数8次 PhoneCombinerCombiner的调试结果
PhoneCombinerReduce的调试结果:while 循环次数1次 PhoneCombinerReduce的调试结果
最终结果:
a 8
b 8
c 8