Hadoop学习之Combiner 合并

2021-06-11  本文已影响0人  万事万物

介绍:

  1. Combiner 是MR程序中Mapper和Reducer之外的一种组件。
  2. Combiner 组件的父类就是Reducer
  3. Combiner 和 Reducer 区别?
    1. Combiner 运行在 Mapper 阶段,是用于统计 \color{red}{当前 MapTask }的数据集。
    2. Reducer 运行在 Reducer 阶段,是用于统计 \color{red}{所有 MapTask }的数据集。
  4. Combiner 作用就是针对每个MapTask的输出进行汇总,减少网络带宽。
  5. 在MapReduce中,默认不使用 Combiner,需要自己去声明。\color{red}{牢记}

使用与不使用Combiner 效果图

默认是不使用Combiner 的,意味着 每个MapTask 到 Reduce 需要经过三万次网络传输。效率上肯定会有所影响。使用Combiner 可以很好的解决这个问题,减少网络带宽,可官方并不推荐使用Combiner ,因为Combiner 统计的结果可能与 Reduce最终的结果在\color{red}{有些场景}下会有问题,所以不推荐。

不使用Combiner 的情况

使用 Combiner 减少网络带宽,提高效率。使用Combiner 之后,每个MapTask 从三万次的传输降低成了3次(a=10000,b=10000,c=10000)。然后 Reduce 在统计每个MapTask 传输过来的数据集结果。


使用Combiner 的情况

自定义实现Combiner

需求分析;

使用Combiner 实现wordCount 功能,统计单词个数。
统计 a b c 的个数。

单词文件内容

a b c a b c a b c a b c 
a b c a b c a b c a b c 

代码实现

mapper

public class PhoneCombinerMapper extends Mapper<LongWritable, Text,Text,LongWritable> {

    private LongWritable outValue = new LongWritable(1);
    private Text outKey = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String lien = value.toString();
        // 按照 空格分割
        List<String> list = Stream.of(lien.split(" ")).filter(s -> s.length() > 0).collect(Collectors.toList());

        for (String str : list) {
            outKey.set(str);
            context.write(outKey,outValue);
        }
    }
}

reducer

public class PhoneCombinerReduce extends Reducer<Text, LongWritable,Text,LongWritable> {
    private LongWritable outValue = new LongWritable();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        Iterator<LongWritable> iterator = values.iterator();
        long sum = 0;
        while (iterator.hasNext()) {
            sum += iterator.next().get();
        }
        outValue.set(sum);

        context.write(key,outValue);
    }
}

combiner;其实这里的内容和reducer的内容是一致的

public class PhoneCombinerCombiner extends Reducer<Text,LongWritable,Text,LongWritable> {
    private LongWritable outValue = new LongWritable();

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<LongWritable> iterator = values.iterator();
        long sum = 0;
        while (iterator.hasNext()) {

            sum += iterator.next().get();
        }
        outValue.set(sum);

        context.write(key,outValue);
    }
}

main


public class PhoneCombinerDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance();

        job.setMapperClass(PhoneCombinerMapper.class);
        job.setReducerClass(PhoneCombinerReduce.class);
        // 配置Combiner 
        job.setCombinerClass(PhoneCombinerCombiner.class);
        // Combiner 是 Reduce 的子类,若实现方式一致,可以直接使用 Reduce。
        // job.setCombinerClass(PhoneCombinerReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\development tool\\document\\temp\\input\\wordcount\\input5.txt"));
        String str = UUID.randomUUID().toString().replaceAll("-", "");
        FileOutputFormat.setOutputPath(job, new Path("D:\\development tool\\document\\temp\\output", str));

        job.waitForCompletion(true);
    }
}

运行调试:

查看运行结果,以 a为例

PhoneCombinerCombiner的调试结果:while 循环次数8次 PhoneCombinerCombiner的调试结果
PhoneCombinerReduce的调试结果:while 循环次数1次 PhoneCombinerReduce的调试结果

最终结果:

a   8
b   8
c   8
上一篇 下一篇

猜你喜欢

热点阅读