大数据学习

Hadoop之MapReduce

2020-02-23  本文已影响0人  TZX_0710

MapReduce是Hadoop的一个分布式计算框架,用于编写批处理应用程序。编写好的程序可以提交到Hadoop集群上用于并行处理大规模的数据集。MapReduce专门用于处理key,value键值对处理。它将作业视为一组key,value,并生成一组key,value作为输出

MapReduce编程模型简述


  1. input:读取文件
  2. spliting:将文件进行拆分。得到K1行数和V1对应的文本类容。
  3. mapping: 并行将每一行按照空格进行拆分,拆分得到List(k2,v2),其中k2代表每一个单词,由于是做词频统计,所以V2的值为1代表出现一次
  4. shuffling:由于Mapping操作可能是在不同的机器上并行处理的。所以需要通过shuffling将相同key值的数据分发到同一个节点上合并。这样才能统计出最终的结果,此时得到K2为每一个单词。List<V2>为可迭代集合,v2就是Mapping中的V2。
  5. Reducing:这里的案例是统计单词出来的总结数,所以reducing对List<v2>进行归约求和操作,最终输出。
    MapReduce编程模型中splitting和shuffing操作都是由框架实现的,`需要我们自己编程的只有mapping和reducing,这也就是MapReduce这个称呼的来源。

InputFormat & RecordReaders
InputFormat将输出文件拆分为多个InputSplit,并由RecordReaders将InputSplit转换为标准的Key,Value键值对,作为map的输出。这一步的意义在于只有先进行逻辑拆分并转为标准的键值对才能为map提供输入以便并行处理。

Combiner
combiner是map运算后的可选操作,它实际上是一个本地化的reduce操作,它主要是map计算出中间文件后做一个简单的合并重复key值的操作。
map在遇到一个hadoop的单词时就会记录1.但是这边文章里可能会出现N次,那么map输出文件冗余就会很多。因此在reduce计算前相同的key做一个合并操作,那么需要传输的数据量就很减少。

Partitioner
partitioner可以理解成分类器,将map的输出按照key值的不同分别分给对应的reducer,支持自定义实现。

wordCount项目案例
创建maven项目 引入pom文件

//因为采用的pom文件 有的jar冲突 所以我采用了maven helper插件移除了这些冲突的依赖项
<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.7</version>
            <exclusions>
                <exclusion>
                    <artifactId>jackson-core-asl</artifactId>
                    <groupId>org.codehaus.jackson</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-mapper-asl</artifactId>
                    <groupId>org.codehaus.jackson</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-collections</artifactId>
                    <groupId>commons-collections</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-logging</artifactId>
                    <groupId>commons-logging</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-lang</artifactId>
                    <groupId>commons-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>netty</artifactId>
                    <groupId>io.netty</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>gson</artifactId>
                    <groupId>com.google.code.gson</groupId>
                </exclusion>
            </exclusions>
        </dependency>

在hadoop中讲解mapreducer有提到过 我们只需要实现map 和reducer
所以下面给出map和reducer的自定义实现

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException,
            InterruptedException {
        String[] words = value.toString().split("\t");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class WordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable <IntWritable> values, Context context) throws IOException,
            InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write( key, new IntWritable( count ) );
    }

}

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class WordCount {
    private static final  String HDFS_URL="hdfs://192.168.80.153:8020";
    private static final String HADOOP_USER_NAME="root";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        if (args.length<2){
            System.out.println("Input and output paths are necessary!");
            return;
        }
        // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

        Configuration configuration = new Configuration();
        // 指明 HDFS 的地址
        configuration.set("fs.defaultFS", HDFS_URL);

        // 创建一个 Job
        Job job = Job.getInstance(configuration);
        job.setJarByClass( WordCount.class );
        // 设置 Mapper 和 Reducer
        job.setMapperClass( WordCountMapper.class);
        job.setReducerClass( WordReducer.class);

        // 设置 Mapper 输出 key 和 value 的类型
        job.setMapOutputKeyClass( Text.class);
        job.setMapOutputValueClass( IntWritable.class);

        // 设置 Reducer 输出 key 和 value 的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
        FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
        Path outputPath = new Path(args[1]);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // 设置作业输入文件和输出文件的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);

        // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
        boolean result = job.waitForCompletion(true);

        // 关闭之前创建的 fileSystem
        fileSystem.close();

        // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
        System.exit(result ? 0 : -1);

    }
}

编写完成代码 采用maven 的package 功能打包成jar 提交到服务器
使用 命令进行运行

hadoop jar /usr/local//hdfs-0.0.1-SNAPSHOT.jar \
 com.spring.hdfs.WordCount   \
 /wordcount/input.txt /wordcount/output/wordcount  --wordcount/input.txt 是读取的文件地址 --workcount/output/workcount 是项目运行完成之后保存在hdfs中的地址

--关于HDFS一些操作指令

1. 创建一个文件夹    hdfs dfs -mkdir /myTask
2. 创建多个文件夹    hdfs dfs -mkdir -p /myTask1/input1
3. 上传文件 hdfs dfs -put /opt/wordcount.txt /myTask/input
4. 查看总目录下的文件和文件夹 hdfs dfs -ls /
5. 查看myTask下的文件和文件夹 hdfs dfs -ls /myTask
6. 查看myTask下的wordcount.txt的内容 hdfs dfs -cat /myTask/wordcount.txt
7. 删除总目录下的myTask2文件夹以及里面的文件和文件夹 hdfs dfs -rmr /myTask2
8. 删除myTask下的wordcount.txt hdfs dfs -rmr /myTask/wordcount.txt
9. 下载hdfs中myTask/input/wordcount.txt到本地opt文件夹中 hdfs dfs -get /myTask/input/wordcount.txt /opt
上一篇下一篇

猜你喜欢

热点阅读