MapReduce分区组件

2020-08-03  本文已影响0人  羋学僧

MapReduce中分区组件

需求:

根据单词的长度给单词出现的次数的结果存储到不同文件中,以便于在快速查询

思路:

1、定义Mapper逻辑
2、定义Reducer逻辑
3、自定义分区Partitioner
4、主调度入口

MapReduce编程流程

首先看Mapper:WordMapper.java

/**
*  Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*      KEYIN:是指框架读取到的数据的key的类型,在默认的InputFormat下,读取的key就是一行文本的起始的偏移量。所以key的类型Long
*      VALUEIN:是指框架读取到的数据的value的类型,在默认的InputFormat下,读取的value就是一行文本。所以value的类型String
*      KEYOUT:是指用户自定义的逻辑方法返回的数据中key的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的key是单词,所以类型是String
*      VALUEOUT:是指用户自定义的逻辑方法返回的数据中value的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的value是次数,所以类型是Long
*
*      但是,String、Long 是jdk中自带的数据类型,在序列化的时候,效率低。
*      hadoop为了提高效率,自定义了一套序列化的框架
*      在hadoop程序当中,如果要进行序列化(写磁盘、网络传输数据),一定要用hadoop实现的序列化的数据类型
*
*      Long        ====》 LongWritable
*      String      ====》 Text
*      Integer     ====》 IntWritable
*      Null        ====》 NullWritable
*
*/
public class WordMapper extends Mapper<LongWritable,Text,Text,LongWritable>{

   /**
    *
    * @param key 就是偏移量
    * @param value  一行文本
    * @param context
    * @throws IOException
    * @throws InterruptedException
    */
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

       //1、单词的切分
       String[] words = value.toString().split(" ");

       //2、计数一次,帮单词转换成<hello,1>这样的key value类型对外输出
       for (String word : words) {
           //3、写入到上下文中
           context.write(new Text(word),new LongWritable(1));
       }

   }
}

其次看Shuffle自定义分区Partitioner:MyPartitioner.java

/**
 *  Partitioner<KEY, VALUE>
 *      KEY: 单词
 *      VALUE 数
 */
public class MyPartitioner extends Partitioner<Text,LongWritable> {
    @Override
    public int getPartition(Text text, LongWritable longWritable, int i) {
        if (text.toString().length() >= 5){
            return 0;
        }else {
            return 1;
        }
    }
}

再看Reducer:WordReducer.java

/**
 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *     KEYIN:map阶段的一个输出的key
 *     VALUEIN:LongWritable类型的数字
 *     KEYOUT:最终的结果单词,Text类型
 *     VALUEOUT:最终的单词的次数,LongWritable类型
 */
public class WordReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

    /**
     *
     * @param key map阶段的一个输出的key
     * @param values  hello <1,1>
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //1、定义一个统计的变量
        long count = 0;

        //2、迭代
        for (LongWritable value : values) {
            count += value.get();
        }

        //3、写入到上下文
        context.write(key,new LongWritable(count));
    }
}

最后看Driver:JobMain.java

/**
 * 主类:将map和reduce串接起来,并提供了运行入口
 */
public class JobMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        //一、初始化一个Job
        Job job = Job.getInstance(configuration, "partitioner");

        //二、设置Job的相关信息,配置8个小步骤
        //1、设置输入路径
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("D://input/test2.txt"));
        TextInputFormat.addInputPath(job,new Path("hdfs://bigdata02:9000/wordcount.txt"));


        //2、设置Mapper类,设置k2 v2
        job.setMapperClass(WordMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //3 4 5 6   Shuffle阶段
        //3、分区
        job.setPartitionerClass(MyPartitioner.class);

        //7、设置Reducer类,并设置k3 v3
        job.setReducerClass(WordReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //设置ReducerTask的个数
        job.setNumReduceTasks(2);

        //8、设置输出的路径
        job.setOutputFormatClass(TextOutputFormat.class);
        //TextOutputFormat.setOutputPath(job,new Path("D://word_out2"));
        TextOutputFormat.setOutputPath(job,new Path("hdfs://bigdata02:9000/word_out2"));

        //三、等待完成
        boolean b = job.waitForCompletion(true);
        System.out.println(b);
        System.exit(b ? 0 : 1);
    }
}



上一篇 下一篇

猜你喜欢

热点阅读