MapReduce程序WordCount业务实现

2020-08-03  本文已影响0人  羋学僧

WordCount的业务逻辑:

1、mapTask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成一个 key,value 对,比如单词 hello,就转换成 <’hello’,1> 发送给 reduceTask 去汇总
2、reduceTask 阶段将接受 mapTask 的结果,来做汇总计数


结果:

hdfs 1
hive 1
flink 1
hadoop 1
hello 5
spark 1

首先看Mapper:WordMapper.java

/**
*  Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*      KEYIN:是指框架读取到的数据的key的类型,在默认的InputFormat下,读取的key就是一行文本的起始的偏移量。所以key的类型Long
*      VALUEIN:是指框架读取到的数据的value的类型,在默认的InputFormat下,读取的value就是一行文本。所以value的类型String
*      KEYOUT:是指用户自定义的逻辑方法返回的数据中key的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的key是单词,所以类型是String
*      VALUEOUT:是指用户自定义的逻辑方法返回的数据中value的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的value是次数,所以类型是Long
*
*      但是,String、Long 是jdk中自带的数据类型,在序列化的时候,效率低。
*      hadoop为了提高效率,自定义了一套序列化的框架
*      在hadoop程序当中,如果要进行序列化(写磁盘、网络传输数据),一定要用hadoop实现的序列化的数据类型
*
*      Long        ====》 LongWritable
*      String      ====》 Text
*      Integer     ====》 IntWritable
*      Null        ====》 NullWritable
*
*/
public class WordMapper extends Mapper<LongWritable,Text,Text,LongWritable>{

   /**
    *
    * @param key 就是偏移量
    * @param value  一行文本
    * @param context
    * @throws IOException
    * @throws InterruptedException
    */
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

       //1、单词的切分
       String[] words = value.toString().split(" ");

       //2、计数一次,帮单词转换成<hello,1>这样的key value类型对外输出
       for (String word : words) {
           //3、写入到上下文中
           context.write(new Text(word),new LongWritable(1));
       }

   }
}

其次看Reducer:WordReducer.java

/**
 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *     KEYIN:map阶段的一个输出的key
 *     VALUEIN:LongWritable类型的数字
 *     KEYOUT:最终的结果单词,Text类型
 *     VALUEOUT:最终的单词的次数,LongWritable类型
 */
public class WordReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

    /**
     *
     * @param key map阶段的一个输出的key
     * @param values  hello <1,1>
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //1、定义一个统计的变量
        long count = 0;

        //2、迭代
        for (LongWritable value : values) {
            count += value.get();
        }

        //3、写入到上下文
        context.write(key,new LongWritable(count));
    }
}

再看Driver:JobMain.java

/**
 * 主类:将map和reduce串接起来,并提供了运行入口
 */
public class JobMain {
    /**
     * 这个main方法是WordCount程序运行的入口,其中用一个Job类对象来管理程序运行的很多参数:
     * 指定用哪个类作为Mapper阶段的业务逻辑类,指定哪个类作为Reducer阶段的业务逻辑类
     * ......其他的各种需要的参数
     * @param args
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();
        //一、初始化一个Job对象
        Job job = Job.getInstance(configuration, "wordcount");

        //二、设置Job对象的相关信息  里面包含了8个小步骤
        //1、设置输入的路径,让程序找到源文件的位置
        job.setInputFormatClass(TextInputFormat.class);
        //运行本地文件
        //TextInputFormat.addInputPath(job,new Path("D://input/test2.txt"));
        //在服务器运行运行
        TextInputFormat.addInputPath(job,new Path("hdfs://bigdata02:9000/wordcount.txt"));

        //2、设置Mapper类,并设置k2 v2
        job.setMapperClass(WordMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 3、分区 4、排序 5、局部合并 6、分组 四个步骤,都是Shuffle阶段,使用默认的配置。

        //7、设置Reducer类,并设置k3 v3的类型
        job.setReducerClass(WordReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //8、设置输出的路径
        job.setOutputValueClass(TextOutputFormat.class);
        //运行本地文件
        //TextOutputFormat.setOutputPath(job,new Path("D://word_out"));
        //在服务器运行运行
        TextOutputFormat.setOutputPath(job,new Path("hdfs://bigdata02:9000/word_out"));

        //三、等待完成
        boolean b = job.waitForCompletion(true);
        System.out.println(b);
        System.exit(b ? 0 : 1);
    }
}

上一篇下一篇

猜你喜欢

热点阅读