MapReduce八股文

2016-12-11  本文已影响55人  _helloliang

Mapper


class MapperModule extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void setup(Context context) 
            throws IOException, InterruptedException {
        // TODO

    }


    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub

    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        // TODO

    }
}

Reducer

class ReducerModule extends Reducer<Text, IntWritable, Text, IntWritable> {


    @Override
    protected void setup(Context context) 
            throws IOException, InterruptedException {
        // TODO

    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                       Context context) 
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub

    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        // TODO

    }

}

Runner

public class RunnerModule implements Tool {
    private Configuration conf = null;

    // setConf
    public void setConf(Configuration conf) {
        conf.set("fs.defaultFS", "hdfs://hive.liangxw.CentOS.com:9000");
        conf.setInt("RUN_TIMES", 1);
        // 设置系统变量,解决hdfs权限问题
        System.setProperty("HADOOP_USER_NAME", "liangxw");
        
        // 设置分隔符
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "#");
        // 设置map输出压缩
        conf.set("mapreduce.map.output.compress", "true");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        
        this.conf = conf;
    }

    // getConf
    public Configuration getConf() {
        return this.conf;
    }

    // run
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        FileSystem fs = FileSystem.get(conf);

        // set job
        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(WCRunner.class);
        
        // 输入输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 如果输出路径存在则删除
        if (fs.exists(outpath)) {
            fs.delete(outpath, true);
        }
        
        // 指定输入文件格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);  
              
        // mapper
        job.setMapperClass(WCMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // ======================shuffle====================
        // 1: partitioner
        job.setPartitionerClass(partitioner.class);
        // 2: sort
        job.setSortComparatorClass(sorter.class);
        // 3: combiner
        job.setCombinerClass(WCReducer.class);
        // 4: compress
        // 在conf处进行设置
        // 5: group
        job.setGroupingComparatorClass(grouper.class);
        // ======================shuffle====================

        // reducer
        job.setReducerClass(WCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 设置reduce输出压缩
        FileOutputFormat.setCompressionOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        
        // 设置顺序文件输出压缩        
        SequenceFileOutputFormat.setCompressOutput(job,true);
        SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);        
        
        // submit job
        boolean isSucces = job.waitForCompletion(true);
        return isSucces ? 0 : 1;
    }

    // main
    public static void main(String[] args) throws Exception {

        args = new String[]{
                    "/user/liangxw/wcinput",
                    "/user/liangxw/wcoutput"
        };

        // run job
        int status = ToolRunner.run(new WCRunner(), args);

        System.exit(status);
    }

}
上一篇下一篇

猜你喜欢

热点阅读