眼君的大数据之路

MapReduce开发笔记(四、计数器)

2020-09-07  本文已影响0人  眼君

全局计数器

程序运行过程中,框架自带一系列计数器,最后在日志中将把结果打印。

主要分为以下几类:
文件系统计数器:主要描述进行文件读写操作的计数器。
mapreduce框架计数器:主要描述mapreduce框架的各组件(map、combine、shuffle、reduce)输入和输出的数据情况。
shuffle阶段错误信息。
文件输入和输出信息。

自定义计数器

应用场景:有全局变量的时候会使用。

接下来运用全局计数器统计一下文本的总记录条数和总的字段数。

首先计数器需要定义一个枚举类:

package com.wenhuan.wordcount;

public enum MyCounter {
    LINES,
    COUNT
}

然后我们开始写MapReduce程序,由于这个任务我们实际上不需要reduce,我们可以直接写驱动类:

package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyCounterTest {
    //统计总条数、总字段数
    static class MyMapper extends Mapper<LongWritable,Text,NullWritable,NullWritable>{
        @Override
        protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
            //获取计数器,统计行数
            Counter lines_counter = context.getCounter(MyCounter.LINES);
            //操作计数器,行数加1
            lines_counter.increment(1L);
            //获取下一个计数器,统计总字段数
            Counter counts = context.getCounter(MyCounter.COUNT);
            String[] datas = value.toString().split("\t");
            //操作计数器
            counts.increment(datas.length);
        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //加载配置文件
        Configuration conf = new Configuration();
        //启动一个Job,封装maper和reducer
        Job job = Job.getInstance(conf);
        //设置计算程序的主驱动类,运行的时候打成jar包运行。
        job.setJarByClass(MyCounterTest.class);
        //设置Maper
        job.setMapperClass(MyMapper.class);
        //设置mapper的输出类型
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        //设置reducetask任务数量为0
        job.setNumReduceTasks(0);
        //设置输入路径和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交,需要打印日志
        job.waitForCompletion(true);
    }   
}
上一篇下一篇

猜你喜欢

热点阅读