Hadoop学习笔记

MapReduce框架原理

2020-05-23  本文已影响0人  Manfestain

最全的MapReduce框架原理,方便以后复习。知识点来自尚硅谷的课程学习。课程链接


一、InputFormat数据输入

1. 切片与MapTask并行度决定机制

数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。实际存储在磁盘上,还是按照HDFS将数据分成一个一个Block进行存储。
MapTask的并行速度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

决定机制原理

2. Job提交流程源码解析

Job提交流程源码解析
waitForCompletion();

// 1 建立连接
connect();
  // 1.1 创建提交Job的代理
  new Cluster(getConfiguration());
  // 1.2 判断是本地yarn还是远程yarn
  initialize(jobTrackAddr, conf);

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
  // 2.1 创建给集群提交数据的Stag路径
  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  // 2.2 获取jobid,并创建Job路径
  JobID jobId = submitClient.getNewJobID();
  // 2.3 拷贝jar包到集群
  copyAndConfigureFiles(job, submitJobDir);
  rUpLoader.uploadFiles(job, jobSubmitDir);
  // 2.4 计算切片,生成切片规划文件
  writeSplits(job, submitJobDir);
  maps = writeNewSplits(job, jobSubmitDir);
  input.getSplits(job);
  // 2.5 向Stag路径写XML配置文件
  writeConf(conf, submitJobFile);
  conf.writeXml(out);
  // 2.6 提交Job,返回提交状态
  status = submitClint.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

3. FileInputFormat切片机制

4. CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按照文件进行切片,当有大量小文件时,就会产生大量的MapTask,处理效率及其低下。

CombineTextFormat用于小文件过多的场景,从逻辑上将多个小文件规划到一个切片中,交给一个MapTask处理。


// 如果不设置InputFormat,默认是TextInputFormat.class
job.setInputFormatClass(CombineTextInputForamt.class);
// 设置虚拟存储切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

5. FileInputFormat实现类

在运行MapReduce程序时,针对不同格式的输入文件,MapReduce是如何读取这些数据的?

FileInputFormat常见的接口实现类包括:

5.1 TextInputFormat

TextinputFormat是默认的FileInputFormat实现类。按行读取每条记录,健是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行,回车等),Text类型。

5.2 KeyValueTextInputFormat

每一行均为一条记录,被分割符分割为key和value,默认的分割符是"\t"。可以在驱动类中进行设置:
conf.set(KeyValueLineRecoredReader,KEY_VALUE_SEPERATOR, "\t");
此时的key是每行排在分割符之前的Text序列。

5.3 NLineInputFormat

如果使用NLineInpurFormat,代表每个MapTask进程处理的InputSplit不再按照Block块去切分,而是按照NLineInputFormat指定的行数N来切分。
输入文件的总行数 \div N=切片数,如果不整除,切片数=商+1
此时的key-value与TextInputFormat生成的一样。可以在驱动类中进行设置:
NLineInputFormat.setNumLinesPerSplit(job, 3);

5.4 自定义InputFormat

具体步骤:
1)自定义一个类继承FileInputFormat;
2)改写RecordReader,实现自定义读取并封装为key-value;
3)在输出时使用SequenceFileOutPutFormat输出合并文件。

SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式,文件路径+文件名为key,文件内容为value。

自定义InputFormat案例:


实现步骤

代码实现:
WholeFileInputFormat.java

package Inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);

        return recordReader;
    }
}

WholeRecordReader.java

package Inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

    FileSplit split;
    Configuration configuration;
    Text k = new Text();
    BytesWritable v = new BytesWritable();
    boolean isProgress = true;

    // 初始化
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.split = (FileSplit) split;
        configuration = context.getConfiguration();
    }

    // 核心的业务逻辑
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (isProgress) {
            byte[] buf = new byte[(int) split.getLength()];

            // 1 获取Fs对象
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(configuration);

            // 2 获取输入流
            FSDataInputStream fis = fs.open(path);

            // 3 拷贝
            IOUtils.readFully(fis, buf, 0, buf.length);

            // 4 封装v
            v.set(buf, 0, buf.length);

            // 5 封装k
            k.set(path.toString());

            // 6 关闭资源
            IOUtils.closeStream(fis);

            isProgress = false;

            return true;
        }

        return false;
    }

    public Text getCurrentKey() throws IOException, InterruptedException {
        return k;
    }

    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return v;
    }

    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    public void close() throws IOException {

    }
}

完成相应的Mapper和Reducer类,并在Driver中增加两句:

// 4 设置输入的InputFormat
job.setInputFormatClass(WholeFileInputFormat.class);

// 5 设置输出的OutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);

二、Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

1. Partition分区

分区就是将计算结果按照条件输出到不同文件中。比如按照手机号归属地将不同省份输出到不同文件中。

1.1系统默认的分区是Hash分区:
public class Hashpartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hasCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
1.2 自定义Partition分区

自定义步骤:



案例实现:

按照手机号前三位将统计结果写入到不同的文件。

自定义Partition类ProvincePartitioner.java

package Flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {

        // 1 获取手机号前三位
        String prePhoneNum = key.toString().substring(0, 3);

        int partition = 3;
        if ("136".equals(prePhoneNum)) {
            partition = 0;
        }else if ("137".equals(prePhoneNum)) {
            partition = 1;
        }else if ("138".equals(prePhoneNum)) {
            partition = 2;
        }
        
        return partition;
    }
}

编写Mapper和Reducer,在Driver类中增加如下代码:

// 5 设置Partition分区
 job.setPartitionerClass(ProvincePartitioner.class);
 job.setNumReduceTasks(4);
1.3 WritableComparable排序

排序是MapReduce中的重要操作。

MapTask和ReduceTask均会对数据按照key进行排序 ,属于Hadoop的默认行为。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

MapReduce排序分类:


自定义排序:
bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

示例编写:

按照电话所属区将统计数据存储到不同的文件中,并在每个文件中实现按总流量的逆序。

重写编写FlowBean.java

package Sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        sumFlow = upFlow + downFlow;
    }

    // 核心的比较
    public int compareTo(FlowBean bean) {

        int result;

        if (sumFlow > bean.getSumFlow()) {
            result = -1;
        }else if(sumFlow < bean.getSumFlow()) {
            result = 1;
        }else{
            result = 0;
        }

        return result;
    }

    // 序列化方法
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    // 反序列化方法
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

编写Mapper类,FlowCountSortMapper.java

package Sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    FlowBean k = new FlowBean();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 拆分
        String[] fields = line.split("\t");

        // 3 封装
        k.setUpFlow(Long.parseLong(fields[1]));   // 流量作为key
        k.setDownFlow(Long.parseLong(fields[2]));
        k.setSumFlow(Long.parseLong(fields[3]));

        v.set(fields[0]);  // 电话号码作为Value

        // 3 写出
        context.write(k, v);
    }
}

编写Reducer类,FlowCountSortReducer.java

package Sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text value : values) {
            context.write(value, key);
        }
    }
}

编写Partition类,ProvincePartitioner.java

package Sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {

        String prePhoneNum = text.toString().substring(0, 3);

        int partition = 3;

        if ("136".equals(prePhoneNum)) {
            partition= 0;
        }else if ("137".equals(prePhoneNum)) {
            partition = 1;
        }else if ("138".equals(prePhoneNum)) {
            partition = 2;
        }

        return partition;
    }
}

编写驱动类,FlowCountSortDriver.java

package Sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountSortDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowCountSortDriver.class);
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(4);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}
上一篇下一篇

猜你喜欢

热点阅读