MapReduce开发笔记(四、计数器)
2020-09-07 本文已影响0人
眼君
全局计数器
程序运行过程中,框架自带一系列计数器,最后在日志中将把结果打印。
主要分为以下几类:
文件系统计数器:主要描述进行文件读写操作的计数器。
mapreduce框架计数器:主要描述mapreduce框架的各组件(map、combine、shuffle、reduce)输入和输出的数据情况。
shuffle阶段错误信息。
文件输入和输出信息。
自定义计数器
应用场景:有全局变量的时候会使用。
接下来运用全局计数器统计一下文本的总记录条数和总的字段数。
首先计数器需要定义一个枚举类:
package com.wenhuan.wordcount;
public enum MyCounter {
LINES,
COUNT
}
然后我们开始写MapReduce程序,由于这个任务我们实际上不需要reduce,我们可以直接写驱动类:
package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyCounterTest {
//统计总条数、总字段数
static class MyMapper extends Mapper<LongWritable,Text,NullWritable,NullWritable>{
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
//获取计数器,统计行数
Counter lines_counter = context.getCounter(MyCounter.LINES);
//操作计数器,行数加1
lines_counter.increment(1L);
//获取下一个计数器,统计总字段数
Counter counts = context.getCounter(MyCounter.COUNT);
String[] datas = value.toString().split("\t");
//操作计数器
counts.increment(datas.length);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//加载配置文件
Configuration conf = new Configuration();
//启动一个Job,封装maper和reducer
Job job = Job.getInstance(conf);
//设置计算程序的主驱动类,运行的时候打成jar包运行。
job.setJarByClass(MyCounterTest.class);
//设置Maper
job.setMapperClass(MyMapper.class);
//设置mapper的输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
//设置reducetask任务数量为0
job.setNumReduceTasks(0);
//设置输入路径和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交,需要打印日志
job.waitForCompletion(true);
}
}