Mac学习大数据玩转大数据大数据,机器学习,人工智能

【大数据学习】第十一篇-MapReduce简介

2019-07-14  本文已影响4人  irving_yuan

MapReduce定义

MapReduce是一个分布式计算的框架,是用户开发机遇hadoop的数据分析应用的核心框架。

MapReduce的优缺点

  1. 易于编程 只要实现一些简单的接口即可实现功能,且编写程序类似串行
  2. 良好的扩展性 支持扩展计算服务器的数量
  3. 高容错性 可以在价格低廉的机器上运行,即便集群中某些节点宕机,也可以正常使用
  4. 适合PB级离线计算

MapReduce的编程思想

MapReduce的编程思想

MapReduce主要包括两个部分 Map阶段 + Reduce阶段,每个阶段中的输入输出都是key-value的形式存在
已文本词数统计为例,两个阶段的流程如下:

  1. Map阶段读取Hadoop分片的数据,按行读取自动进行一次map操作,得到输入key-value对应为 “偏移量-本行数据”。
    偏移量实际是该行起始的数据长度索引,可以理解为行号,例如第一行偏移量为0,数据10byte,则第二行偏移量为11。
  2. Map阶段第二步执行我们实现的接口算法,并将结果的key-value(单词-每行的词频 如 java - 1 2 1 4 1)输出都磁盘上。整个Map阶段都是完全并行执行的。
  3. Reduce阶段读取Map的结果,执行实现的接口,对每个分片的结果进行初次的汇总
  4. Reduce阶段对每个分片的结果再次进行汇总成为一个最终结果
    注:通常一个分片对应hadoop中存储的一个块,即128M,这也可以避免载入内存文件过大,撑爆内存

实现wordcount编码

Map类,继承Mapper,重写其map方法实现对每个单词的统计,范型是根据自己业务需要定义的类型

package com.irving.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * map执行类
 * 四个范型是map输入和输出的类型
 * @LongWritable 字符偏移量
 * @Author yuanyc
 * @Date 15:17 2019-07-11
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * 重写map方法
     * @Author yuanyc
     * @Date 15:19 2019-07-11
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 每行起始的偏移量
        System.out.println("-------");
        System.out.println("偏移量" + key.get());
        // 按行读取的数据
        String line = value.toString();
        // 根据空格切分str
        String[] arr = line.split(" ");
        // 对字符传标记1
        for (String str : arr) {
            context.write(new Text(str), new IntWritable(1));
        }
    }
}

Reduce类,继承Reducer类,重写reduce方法,实现对map结果的汇总

package com.irving.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce类
 * @Author yuanyc
 * @Date 11:17 2019-07-14
 */
public class WordCountRecuder extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // values 形如 1, 2,2,1,1
        // 对词频进行统计
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

编写启动类,执行mapreduce算法
注:输入输出路径应当为hdfs的目录,但是本地调试阶段可以使用linux文件系统目录

package com.irving.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 启动类
 * @Author yuanyc
 * @Date 15:39 2019-07-11
 */
public class WordCountMain {
    public static void main(String[] args) {

        Configuration configuration = new Configuration();
//        args = new String[]{"/Users/yuanyc/Documents/workspace/hdfs/test.txt", "/Users/yuanyc/Documents/workspace/hdfs/out"};

        try {
            // 创建job
            Job job = Job.getInstance(configuration);
            job.setJarByClass(WordCountMain.class);

            // 指定map类
            job.setMapperClass(WordCountMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            // 指定reduce
            job.setReducerClass(WordCountRecuder.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            // 指定输入输出路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 提交任务
            job.waitForCompletion(true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

测试数据


测试数据

本地执行结果


分片为1
执行结果

注:输出目录不能重复存在,要重新执行时需要删除现有目录

Hadoop的序列化

通过上面代码可以看出,MR在编码过程中使用的输入输出对象类型都是不是自定义的类型。
使用的这些类型是Hadoop定义的基础类型,由于mapreduce过程中伴随大量的IO操作,因此需要针对序列化进行性能优化。
Java常用类型与Hadoop序列化类型的对照表

JDK的类型 Hadoop序列化类型
int IntWritable
long LongWritable
float FloatWritable
double DoubleWritable
byte ByteWritable
boolean BooleanWritable
String Text
Map MapWritable
Array ArrayWritable

自定义Java对象的序列化

自定义的Bean需要实现writable接口,重写序列化和反序列化的方法

package com.irving.wordcount;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 自定义bean的hadoop序列化
 * @Author yuanyc
 * @Date 12:37 2019-07-14
 */
public class BeanWritable implements Writable, Comparable {

    private String name;
    private int age;

    /**
     * 序列化方法
     * @Author yuanyc
     * @Date 12:53 2019-07-14
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeChars(name);
        dataOutput.writeInt(age);
    }

    /**
     * 反序列化,顺序要与序列化一致
     * @Author yuanyc
     * @Date 12:53 2019-07-14
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        name = dataInput.readUTF();
        age = dataInput.readInt();
    }

    /**
     * 自定义对象用key时,需要重写compareto方法,用于shuffle阶段的排序
     * @Author yuanyc
     * @Date 12:52 2019-07-14
     */
    @Override
    public int compareTo(Object o) {
        return 0;
    }
}
上一篇下一篇

猜你喜欢

热点阅读