Map阶段的切片逻辑

2019-03-07  本文已影响0人  苏坡闷

1.切片逻辑入口 (JobSubmitter类中的submitJobInternal方法中)

  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {
    ...
     
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      ...
      }
    }
  }
  1. writeSplits()
//切片方法主方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //根据是否使用了新的API来确定使用那种切片方法
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
}

3.writeNewSplits() 方法

@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
  InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    // 切片需要根据输入格式,调用getSplits()
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    //写入切片信息
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
}

4.默认的切片逻辑

public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();      //以纳秒为单位测试执行时间

     // getFormatMinSplitSize()=1
     // getMinSplitSize(job)尝试获取mapreduce.input.fileinputformat.split.minsize=0
     // minSize默认是1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // 尝试获取mapreduce.input.fileinputformat.split.maxsize,默认没有则使用Long.MaxValue
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // 获取输入路径下所有文件的状态信息
    List<FileStatus> files = listStatus(job);

    // 遍历每一个文件
    //由此可以看出这种切片策略是按照文件来切
    for (FileStatus file: files) {
        Path path = file.getPath();
        // 获取文件大小
        long length = file.getLen();
        if (length != 0) {
            BlockLocation[] blkLocations;   //File所属块的地址
            if (file instanceof LocatedFileStatus) {
                blkLocations = ((LocatedFileStatus) file).getBlockLocations();   //本地文件,直接获取文件信息
            } else {
                FileSystem fs = path.getFileSystem(job.getConfiguration());     //远程文件,需调取远程文件系统来获取块信息
                blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            // 判断文件是否可切,如果可切,进行切割,
            if (isSplitable(job, path)) {
                // 获取块大小,默认HDFS为128M,windows本地为32M,1.x版HDFS为64M
                long blockSize = file.getBlockSize();
                // 可以通过控制每片的大小,控制切片总数
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);
                // 获取待切部分
                long bytesRemaining = length;
                // 判断待切部分 / 片大小 是否 > 1.1,如果大于,那就切一片,继续判断
                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                    splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));
                    //起始位置分别为0,splitSize,2*splitSize........
                    bytesRemaining -= splitSize;
                }
                // 剩余部分/ 片大小 <=1.1 ,整个剩余部分作为1片! 每个文件,有可能最后一片会大于块大小,但是不会超过块的1.1倍
                if (bytesRemaining != 0) {
                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                    blkLocations[blkIndex].getHosts(),
                    blkLocations[blkIndex].getCachedHosts()));
                }
            } else { // not splitable
                // 文件不可切,整个文件作为1片
                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
            }
        } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }
    // Save the number of input files for metrics/loadgen
    //设置mapreduce.input.fileinputformat.numinputfiles的值为输入文件的数量
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
}

4.1在此处可能会发生的问题

如果splitSize等于blocksize,则InputSplit的起始位置与相应块在文件中offset一致,但如果splitSize小于blocksize,即通过参数mapreduce.input.fileinputformat.split.maxsize控制每个InputSplit的大小,那么InputSplit的起始位置就会位于文件块的内部。上述分割InputSplit的逻辑完全是针对大小进行的,那么就会存在将一行记录划分到两个InputSplit中的可能。但上述代码并未对这种情况进行处理,这也意味着在Map阶段存在数据不完整的可能,Hadoop当然不会允许这种情况的发生,而RecordReader就负责处理这种情况,下面以LineRecordReader为例,看看Hadoop是如何处理记录跨InputSplit的。LineRecordReader中的构造方法在初始化的时候调用一次,此方法将定位InputSplit中第一个换行符的位置,并将实际读取数据的位置定位到第一个换行符之后的位置,如果是第一个InputSplit则不用如此处理,而被过滤的内容则由读取该InputSplit之前的Split的LineRecordReader负责读取,确定第一个换行符位置的代码片段为: image.png

读取InputSplit内容的代码位于next()方法中,具体代码为:


image.png

readLine()的功能是读取一行的数据到Text中,因为在分割FileSplit时是基于size的,如果一行被分割到两个split中,比如s1和s2中,在读取s1中的最后一行数据时,会一直读取到s2中的第一个换行符,这是在next()方法中实现的,而在处理s2时,则要将已经读取的数据跳过以避免重复读取,这是在构造方法中实现的。

5.isSplitable()

@Override
protected boolean isSplitable(JobContext context, Path file) {
    // 根据文件的后缀名,获取相关的压缩编码器
    final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    // 如果文件是个普通文件,没有后缀名或者后缀名不是一个压缩格式,返回true
    if (null == codec) {
        return true;
    }
    // 如果文件是压缩格式,继续判断当前压缩格式是否可切
    return codec instanceof SplittableCompressionCodec;
}

6.1JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法


image.png

6.2进入createFile()方法:


image.png

6.3进入writeNewSplits()方法,它将切片数据写入切片文件,并得到切片元数据信息SplitMetaInfo数组info:


image.png
6.4最后进入writeJobSplitMetaInfo()方法,查看分片的元数据信息文件是如何产生的:
image.png

以上内容均基于TextInputFormat切片策略
参考文档
Hadoop-2.4.1源码分析--MapReduce作业切片(Split)过程
https://blog.csdn.net/u010010428/article/details/51469994

上一篇下一篇

猜你喜欢

热点阅读