Hadoop应用开发:MapReduce
2018-06-21 本文已影响8人
Vechace
Hadoop应用开发
以气象数据分析为例,分析年份气温最高值
MapReduce编程流程
MapReduce编程遵循特定流程
- 首先编程map函数和reduce函数,并使用单元测试来确保函数的运行是否符合预期;
- 然后编写驱动程序来运行作业,可以从本地IDE中用一个小的数据集来运行,观察结果
- 如果驱动程序不能正确运行,则用IDE调试器找出问题根源,根据调试信息,扩展单元测试,从而改进mapper或reducer
- 当程序按预期通过数据集测试后,则可以考虑把它放在集群上运行。
开发环境配置
常见maven依赖:
- hadoop-client:构建MapReduce程序
- mrunit:运行MapReduce单元测试
- hadoop-minicluster:运行Hadoop集群测试
单元测试
使用MRUnit测试库
mapper测试范例:
import java,io,IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*
public class MaxTemperatureMapperTest{
@Test
public void processesValidRecord()throws IOException,InterruptedException{
//模拟输入数据集
Text value = new Text("");
new MapDirver<LongWritable,Text,Text,IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(0),value)
.withOutput(new Text("1950"),new IntWritable(-11))
.runTest();
}
}
当mapper不通过测试时,在考虑往mapper中新增逻辑处理代码时,考虑是否能够将逻辑处理代码独立出来,用一个解析类来封装解析逻辑,如下:
解析NCDC格式的气温记录(对数据的预处理)
public class NcdcRecordParser{
private statis final int MISSING_TEMPERATURE = 9999;
private String year;
private int airTemperature;
private String quality;
public void parse(String record){
year = record.substring(15,19);
String airTemperatureString;
if(record.charAt(87) == '+'){
airTemperatureString = record.substring(88,92);
}else{
airTemperatureString = record.substring(87,92);
}
airTemperature = Integer.parseInt(airTemperatureStirng);
quality = record.substring(92,93);
}
public void parse(Text record){
parse(racord.toString);
}
public boolean isValidTemperature(){
return airTemperature != MISSING_TEMPERATURE && quality.matches([01459]);
}
public String getYear(){
return year;
}
public int getAirTemperature(){
return airTemperature;
}
}
改进mapper;
public class MaxTemperatureMapper
extend Mapper<LongWritable,Text,Text,IntWritable>{
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
parser.parse(value);
if(parser.isValidTemperature()){
context.write(new Text(parser.getYear()),
new IntWritable(parser.getAirTemperature()));
}
}
}
测试完mapper后,开始编写reducer及其测试用例
reducer:找出指定键的最大值
public class MaxTemperatureReducer
extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
public void reduce(Text key,Iterable<IntWritable> values,
Context context) throws IOException,InterruptedException {
int maxValue = Integer.MIN_VALUE;
for(IntWritable value:values){
maxValue = math.max(maxValue,value.get());
}
context.write(key,new IntWritable(maxValue));
}
}
reducer的test实例
public MaxTemperatureReducerTest{
@Test
public void returnsMaxIntegerInValues()throws IOException,InterruptedException{
new ReducerDriver<Text,IntWritable,Text,IntWritable>()
.withReducer(new MaxTemperatureReducer())
.withInputKey(new Text("1950"),
Arrays.asList(new IntWritable(10),new IntWritable(5)))
.withOutput(new Text("1950"),new IntWritable(10))
.runTest();
}
}
在本地作业运行器上运行作业,编写MapReduce作业的驱动程序
public class MaxTemperatureDriver extends Configured implements Tool{
@Override
public int run(Stringp[] args)throws Exception{
if(args.length !=2){
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(),"Max temperature");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
//Combiner的作用是对map端输出先做合并,以减少传输到reducer的数据量
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxtemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waiForCompletion(true) ?0:1;
}
public static void main(String[] args)throws Exception{
int exitCode = ToolRunner.run(new MaxTemperatureDriver(),args);
System.exit(exitCode);
}
}
参考资料:《Hadoop权威指南》