hadoop

MapReduce(一):MapReduce简述

2021-11-25  本文已影响0人  codeMover

MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发”基于hadoop的数据分析应用“的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

MapReduce优缺点

优点:

缺点:

MapReduce编程思想

1.1 MapReduce核心编程思想.png

1)MapReduce运行程序一般需要分成2个阶段:map阶段和reduce阶段。

2)Map阶段的并发Map Task,完全并行运行,互不相干。

3)Reduce结算的并发ReduceTask,完全互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出

4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行

MapReduce进程

一个完整的MapReduce程序在分布式运行时有三个实例进程:

1)MrAppMaster:负责整个程序的过程调度及状态协调。

2)MapTask:负责Map阶段的整个数据处理流程。

3)ReduceTask:负责Reduce阶段的整个数据处理流程。

常用数据序列化类型

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Integer IntWritable
Float FloatWritable
Double DoubleWritable
String Text
Map MapWritable
Array MapWritable
Null NullWritable

MapReduce编程规范

用户编写的程序分为三个部分:Mapper、Reducer和Driver。

Map阶段

Reducer阶段

Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的Job对象

实战:统计文件中单词出现的次数(WordCount)

1.2 统计文件中国呢单词出现的个数.png

Mapper

Reducer

Driver

实战:编码

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>mapReduceDemo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.30</version>
    </dependency>
  </dependencies>

</project>

log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

注意:导包的包前缀《org.apache.hadoop》

WordCountMapper.java

package com.magw.mapreduce.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @Description mapper
 * @Author tuzki
 * @Date 2021/11/20 11:20 下午
 * @Version 1.0
 * KEYIN, map阶段输入的key的类型:LongWritable
 * VALUEIN, map阶段输入value类型:text
 * KEYOUT, map阶段输出的key的类型:Text
 * VALUEOUT map阶段输出value类型:IntWritable
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    public void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        // 1 获取一行
        String lineStr = value.toString();
        // 2 切割
        String[] words = lineStr.split(" ");
        // 3 循环写出
        for (String word : words) {
            // 封装outKey
            outKey.set(word);
            // 写出
            context.write(outKey, outV);
        }
    }
}

WordCountReducer.java

package com.magw.mapreduce.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @Description reducer
 * @Author tuzki
 * @Date 2021/11/20 11:20 下午
 * @Version 1.0
 *
 *  * KEYIN, reduce阶段输入的key的类型:Text
 *  * VALUEIN, reduce阶段输入value类型:IntWritable
 *  * KEYOUT, reduce阶段输出的key的类型:Text
 *  * VALUEOUT reduce阶段输出value类型:IntWritable
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values,
        Reducer<Text, IntWritable, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        // 累加
        for (IntWritable value : values) {
            sum += value.get();
        }
        IntWritable outV = new IntWritable(sum);
        // 写出
        context.write(key,outV);
    }
}

WordCountDriver

package com.magw.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @Description Driver
 * @Author tuzki
 * @Date 2021/11/20 11:20 下午
 * @Version 1.0
 */
public class WordCountDriver {

    public static void main(String[] args) throws Exception {
        //1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        //3 关联mapper、reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4 设置mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5 设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //6 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("~/Documents/md/a.txt"));
        FileOutputFormat.setOutputPath(job, new Path("~/Documents/md/wordCount"));
        //7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

线上运行wordCount

hadoop jar zzz.jar com.xxx.WordCounterDriver /input /output

注意先上运行时,代码中关于路径的设置需要修改,另外保证先上输出历经/output不存在。

public class WordCountDriver {

    public static void main(String[] args) throws Exception {
        //1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        //3 关联mapper、reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4 设置mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5 设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //6 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/wordcount"));
        FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/wordcount"));

        //7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

小结:

本节介绍了MapReduce,了解mapreduce的编程思想,最后通过实战的方式完成了一个wordCount实例。

上一篇 下一篇

猜你喜欢

热点阅读