玩转大数据大数据大数据,机器学习,人工智能

MapReduce(一)---入门

2019-06-22  本文已影响0人  Coding小聪

1. MapReduce概述

MapReduce是一个分布式计算的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码并发地运行在一个Hadoop集群上

1.1 MapReduce进程

一个完整的MapReduce程序在运行时有3种进程:

  1. MrAppMaster:负责整个MapReduce程序的调度和状态协调;
  2. MapTask:负责Map阶段的数据处理流程;
  3. ReduceTask:负责Reduce阶段的数据处理流程。

2. MapReduce编程

用户需要按照一定的规范来进行MapReduce程序的开发。

2.1编程规范

用户编写的MapReduce程序由三个部分组成:Mapper、Reducer和Driver.

Mapper编写
  1. 用户自定义的Mapper继承特定的Mapper类;
  2. 定义Mapper的输入数据类型(K-V对形式的两个参数,都需要指定);
  3. 定义Mapper的输出数据类型(K-V对形式的两个参数,都需要指定);
  4. 重写map()方法,将业务逻辑写在其中。

对于每个输入的<K,V>参数,map()方法均会执行一次。

Reduce编写
  1. 用户自定义Reducer继承特定的Reducer父类;
  2. 定义Reducer的输入数据类型,其也是K-V对,并对应Mapper的输出数据类型;
  3. 重写reduce()方法,将业务逻辑写入其中。

ReduceTask进程会对每一组相同k的<k,v>输入参数只调用一次reduce()方法。

Driver编写

相当于YARN集群的客户端,用于提交MapReduce程序到YARN集群,提交的是一个job对象,该job对象封装了MapReduce程序相关的运行参数。

2.1 常用数据类型

Hadoop的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储。

Hadoop类型 Java类型
BooleanWritable boolean
ByteWritable byte
IntWritable int
FloatWritable float
LongWritable long
DoubleWritable double
Text String
MapWritable map
ArrayWritable array

自定义MapReduce数据类型

自定义数据类型有两种方式:

  1. 实现Writable接口
    重写 write()和readFields()方法
  2. 实现WritableComparable接口
    重写 write(),readFields()和compareTo()方法。

3. MapReduce的优缺点

优点

  1. MapReduce编程简单。只需要实现一些接口,就可以完成一个分布式计算程序,并分不到大量廉价的PC机器上运行;
  2. 良好的扩展性。当计算资源不足时,可以通过简单的增加机器来扩展计算能力;
  3. 高容错性。MapReduce程序运行在廉价的PC机器上,当其中一台机器宕机了,它可以自动将计算任务转移到另外一个节点上运行,而不需要人工参与;
  4. PB级以上海量数据的计算

缺点

  1. 不擅长实时计算。MapReduce一般用来做数据的离线处理,它没法像MySQL一样,在毫秒或者秒级别内返回结果;
  2. 不擅长流式计算。流式计算的输入数据是动态的,而MapReduce的输入数据是静态的,不能动态变化;
  3. 不擅长DAG(有向图)计算。DAG计算指的是,多个应用存在依赖关系,后一个应用的输入为前一个应用的输出。使用MapReduce进行此种计算,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下。

4. MapReduce工作流程


1)默认HDFS中一个存储块的大小是128M,所以需要将200M拆成128+72;
2)maptask并行运行,互不影响;
3)reducetask也是并行运行,互不影响,但是reducetask的开始要依赖于所以的maptask都运行结束。

5. WordCount案例

这里实现一个和Hadoop官方提供的wordcount类似功能,需求如下:
输入:文本文件,文件中包含一些单词。
输出:各个单词输出的次数。

对照“2.1编程规范”,我们需要编写Mapper、Reducer、Driver。

5.1 Mapper

public class WordcountMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
    Text outKey = new Text();
    IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words =  line.split(" ");
        for (String word : words) {
            outKey.set(word);
            context.write(outKey,outValue);
        }
    }
}

5.2 Reducer

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable outValue = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

        outValue.set(sum);
        context.write(key, outValue);
    }
}

5.3 Driver

public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取配置信息以及封装任务
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 设置jar加载路径
        job.setJarByClass(WordcountDriver.class);

        // 设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 设置map输出kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交
        job.submit();
    }
}

需要注意一点:导包的时候很多类在org.apache.hadoop.mapreduce包和org.apache.hadoop.mapred中存在同名的情况,一般导入org.apache.hadoop.mapreduce包。

5.4 打包运行

在pom.xml文件中配置maven-assembly-plugin,然后通过mvn install指令对应用进行打包, 最后在target目录中可以看到打好的包

其中with-dependencies.jar中多了依赖,如果是在集群中运行的话,集群中包含mapreduce运行所需的jar包,所以使用不带依赖的jar包即可。

将wordcount-1.0-SNAPSHOT.jar上传到hadoop服务器,然后运行hadoop jar即可。

[hadoop@hadoop01 software]$ hadoop jar wordcount-1.0-SNAPSHOT.jar com.zgc.mapreduce.wordcount.WordcountDriver /usr/hadoop/input /usr/hadoop/output

其中,/usr/hadoop/input是一个hdfs的目录,它下面含有需要统计单词次数的文件。

执行完成之后,可以通过hdfs dfs -cat /usr/hadoop/output/part-r-00000指令查看统计结果。


参考

  1. MapReduce的核心编程思想
上一篇下一篇

猜你喜欢

热点阅读