Hadoop中job提交split的原理解析

2019-07-05  本文已影响0人  吃货大米饭

一、定义

1、block:block是物理切块,在文件上传到HDFS文件系统后,对大文将以每128MB的大小切分若干,存放在不同的DataNode上。
2、split:split是逻辑切片,在mapreduce中的map task开始之前,将文件按照指定的大小切割成若干个部分,每一部分称为一个split,默认是split的大小与block的大小相等,均为128MB。
注意:在hadoop1.x版本中,block默认的大小为64MB,在hadoop2.x版本修改成了128MB

2、参数设置

1、block的默认配置在hdfs-site.xml中配置

<property>
  <name>dfs.blocksize</name>
  <value>134217728</value>
  <description>
        文件的默认块大小(以字节为单位)。还可以可以使用以下后缀(不区分大小写)(例如128k,512m,1g等),或提供完整的字节大小(例如134217728为128 MB)。
  </description>
</property>

注意:默认配置就是最佳实践
2、split大小由minSizemaxSizeblocksize决定,以下是默认配置情况下的。

这里拿一段HadoopRDD的getPartition方法来说:
假设分区数是1.
获取的分片的过程通过调用FileInputFormat.getSplits来实现分片

public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    Stopwatch sw = new Stopwatch().start();
    //获取所有的FileStatus
    //ListStatus 方法里面:
    // 1,判断是否需要递归 
    //2,接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的
    //3,根据 mapreduce.input.fileinputformat.list-status.num-threads 决定是并发还是单线程 
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }
  //获取目标分片 goalsize 和最小 minsize ,numSplits在spark中为min(你设置的分区数,2)。
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
//判断文件是否支持切分,不压缩或者压缩方式为 BZip2Codec 支持切分 
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
//Math.max(minSize, Math.min(goalSize, blockSize))
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
//bytesRemaining/128m(默认)>1.1进行切片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }
上一篇下一篇

猜你喜欢

热点阅读