我爱编程

Hadoop应用开发:MapReduce

2018-06-21  本文已影响8人  Vechace

Hadoop应用开发

以气象数据分析为例,分析年份气温最高值

MapReduce编程流程
MapReduce编程遵循特定流程

开发环境配置

常见maven依赖:

单元测试

使用MRUnit测试库

mapper测试范例:

import java,io,IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*

public class MaxTemperatureMapperTest{
    @Test
    public void processesValidRecord()throws IOException,InterruptedException{
        //模拟输入数据集
        Text value = new Text("");
        new MapDirver<LongWritable,Text,Text,IntWritable>()
            .withMapper(new MaxTemperatureMapper())
            .withInput(new LongWritable(0),value)
            .withOutput(new Text("1950"),new IntWritable(-11))
            .runTest();
    }
}

当mapper不通过测试时,在考虑往mapper中新增逻辑处理代码时,考虑是否能够将逻辑处理代码独立出来,用一个解析类来封装解析逻辑,如下:

解析NCDC格式的气温记录(对数据的预处理)

public class NcdcRecordParser{
    private statis final int MISSING_TEMPERATURE = 9999;
    
    private String year;
    private int airTemperature;
    private String quality;
    
    public void parse(String record){
        year = record.substring(15,19);
        String airTemperatureString;
        if(record.charAt(87) == '+'){
            airTemperatureString = record.substring(88,92);
        }else{
            airTemperatureString = record.substring(87,92);
        }
        airTemperature = Integer.parseInt(airTemperatureStirng);
        quality = record.substring(92,93);
    }
    
    public void parse(Text record){
        parse(racord.toString);
    }
    public boolean isValidTemperature(){
        return airTemperature != MISSING_TEMPERATURE && quality.matches([01459]);
    }
    
    public String getYear(){
        return year;
    }
    
    public int getAirTemperature(){
        return airTemperature;
    }
}

改进mapper;

public class MaxTemperatureMapper 
    extend Mapper<LongWritable,Text,Text,IntWritable>{
        private NcdcRecordParser parser = new NcdcRecordParser();
        
        @Override
        public void map(LongWritable key,Text value,Context context)
            throws IOException,InterruptedException {
            
            parser.parse(value);
            if(parser.isValidTemperature()){
                context.write(new Text(parser.getYear()),
                    new IntWritable(parser.getAirTemperature()));
            }
        }
    }

测试完mapper后,开始编写reducer及其测试用例

reducer:找出指定键的最大值

public class MaxTemperatureReducer
    extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        public void reduce(Text key,Iterable<IntWritable> values,
            Context context) throws IOException,InterruptedException {
            int maxValue = Integer.MIN_VALUE;
            for(IntWritable value:values){
                maxValue = math.max(maxValue,value.get());
            }
            context.write(key,new IntWritable(maxValue));
        }
}

reducer的test实例

public MaxTemperatureReducerTest{
    @Test
    public void returnsMaxIntegerInValues()throws IOException,InterruptedException{
        new ReducerDriver<Text,IntWritable,Text,IntWritable>()
            .withReducer(new MaxTemperatureReducer())
            .withInputKey(new Text("1950"),
                Arrays.asList(new IntWritable(10),new IntWritable(5)))
            .withOutput(new Text("1950"),new IntWritable(10))
            .runTest();
    }
}

在本地作业运行器上运行作业,编写MapReduce作业的驱动程序

public class MaxTemperatureDriver extends Configured implements Tool{
    @Override
    public int run(Stringp[] args)throws Exception{
        if(args.length !=2){
            System.err.printf("Usage: %s [generic options] <input> <output>\n",
                getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        
        Job job = new Job(getConf(),"Max temperature");
        job.setJarByClass(getClass());
        
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        
        job.setMapperClass(MaxTemperatureMapper.class);
        //Combiner的作用是对map端输出先做合并,以减少传输到reducer的数据量
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxtemperatureReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        return job.waiForCompletion(true) ?0:1;
    }
    public static void main(String[] args)throws Exception{
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(),args);
        System.exit(exitCode);
    }
}

参考资料:《Hadoop权威指南》

上一篇下一篇

猜你喜欢

热点阅读