MapReduce

2018-01-22  本文已影响17人  BlackChen

MapReduce原理与特性

源自于Google的MapReduce论文
发表与于2014年12月
Hadoop MapReduce是Goole MapReduce的克隆版

是一个批处理计算框架
一个MapRecduce程序分为Map阶段和Reduce 阶段
MapReduce特性:

MapReduce 常用应用场景

不擅长场景

MapReduce 编程模型

MapReduce将作业的整个运行过程分为两个阶段Map阶 段和Reduce阶段

Map阶段由一定数量的Map Task组成

Reduce阶段由一定数量的Reduce Task组成

注意: 粗体是可以编程实现修改的.


image.png

InputFormat

Split与Block

Partitioner

Partitioner决定了Map Task输出的每条数据交给哪个
Reduce Task处理

默认实现:HashPartitioner

编程接口

Java编程接口组成
• 旧API:所在java包:org.apache.hadoop.mapred
• 新API:所在java包:org.apache.hadoop.mapreduce
新API具有更好的扩展性
两种编程接口只是暴露给用户的形式不同,内部执行
引擎是一样的

创建maven工程,导入pom.xml,编写wordcount
wordcount

package class3;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
    static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //拿到一行数据,将输入的序列化数据转换成字符串
            String line = value.toString();
            //将一行数据按照分隔符拆分
            String[] words = line.split("\t");
            //遍历单词数据,输出单词<k,1>
            for (String word : words) {
                //需要序列化写出
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        //reduce方法是针对输入的一组数据,一个key和它的所有value组成一组(k:v1,v2,v3)
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //定义一个计数器
            int count = 0;
            //遍历一组数据,将key出现次数累加到count
            for (IntWritable value : values) {
                count += value.get();
            }
            context.write(key, new IntWritable(count));

        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String jobName = args[0];
        String inputPath = args[1];
        String outputPath = args[2];
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //设置作业名称
        job.setJobName(jobName);
        //设置主类
        job.setJarByClass(WordCount.class);
        //设置作业中使用的Mapper和Reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //设置Mapper阶段的输出key类型和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reducer阶段的输出key类型和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置job的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>hadooptest</groupId>
    <artifactId>hadooptest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

打包生成 jar包


上传到服务器,然后编写input file--->test.txt,上传到hdfs.

aaa     asdf
asdf
asfe
adsf
asdfaf
sadf    sdf     asf     asd
helo    hello   hello   hello
[hadoop@hadoop0 ~]$ hadoop fs -ls /wordcount/input/
Found 1 items
-rw-r--r--   3 hadoop supergroup         72 2018-01-23 21:07 /wordcount/input/test.txt

执行jar

hadoop jar hadooptest-1.0-SNAPSHOT.jar class3.WordCount WordCount /wordcount/input/ /wordcount/output
(包名+类名,类名,输入路径,输出路径)

执行完成!查看output

[hadoop@hadoop0 ~]$ hadoop fs -ls /wordcount/output
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2018-01-23 21:09 /wordcount/output/_SUCCESS
-rw-r--r--   3 hadoop supergroup         76 2018-01-23 21:09 /wordcount/output/part-r-00000

hadoop fs -cat /wordcount/output/part-r-00000

aaa 1
adsf    1
asd 1
asdf    2
asdfaf  1
asf 1
asfe    1
hello   3
helo    1
sadf    1
sdf 1

注意:输出路径在提交之前要保证不存在,MapReduce运行完会 自动创建,如果存在输出路径在提交的时候会报文件夹存在的错 误提示

用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运 行mr程序的客户端)

  1. Mapper的输入数据是KV对的形式
  2. Mapper的输出数据是KV对的形式
  3. Mapper中的业务逻辑写在map()方法中
  4. map()方法对每一个<K,V>调用一次
  5. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV对
  6. Reducer的业务逻辑写在reduce()方法中
  7. Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
  8. 用户自定义的Mapper和Reducer都要继承各自的父类
  9. 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必
    要信息的job对象
上一篇下一篇

猜你喜欢

热点阅读