大数据 爬虫Python AI Sql大数据大数据,机器学习,人工智能

MapReduce(二):分片

2019-01-11  本文已影响0人  b91cbec6a902

概述

基于Hadoop 2.x

核心方法:org.apache.hadoop.mapreduce.JobSubmitter#writeSplits

org.apache.hadoop.mapreduce.InputFormat 负责创建分片和分片记录读取器

// 创建分片,这个分片只是逻辑上的分片,不包含数据。每个分片都会启动一个Map进程处理。
public abstract  List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException;

public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context ) throws IOException, InterruptedException;

org.apache.hadoop.mapreduce.InputSplit 具体的分片信息

// 当前分片的大小,也就是字节数
public abstract long getLength() throws IOException, InterruptedException;

// 当前分片所属文件的地址,因为文件有多个副本,所以地址有多个
public abstract String[] getLocations() throws IOException, InterruptedException;

// SplitLocationInfo用来描述当前分片所属文件存储状况,是磁盘,还是内存,具体位置在哪。返回null代表文件的所有副本存储在磁盘。
public SplitLocationInfo[] getLocationInfo() throws IOException {
    return null;
}

具体的分片逻辑

org.apache.hadoop.mapreduce.lib.input.FileInputFormat

1、FileInputFormat中如何确定分片的大小?

// 最小分片大小,默认1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

// 最大分片大小,默认Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);

// 块大小。文件系统是HDFS的话,就是HDFS块大小。
long blockSize = file.getBlockSize();

// 计算分片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
}

从上面的逻辑可以看出,分片的大小为:maxSize,blockSize,minSize三者的中间值。

2、怎么分片?
①首先需要知道,被处理的这些文件的状态。

List<FileStatus> files = listStatus(job);
public class FileStatus implements Writable, Comparable<FileStatus> {
    // 地址
    private Path path;
    // 总字节数
    private long length;
    // 是否是目录
    private boolean isdir;
    // 副本数量
    private short block_replication;
    // 文件在文件系统(HDFS)上的块大小
    private long blocksize;
    private long modification_time;
    private long access_time;
    private FsPermission permission;
    private String owner;
    private String group;
    private Path symlink;
}

如果是分布式文件系统(HDFS)还有文件的所有块的信息。

public class LocatedFileStatus extends FileStatus {
    private BlockLocation[] locations;
}
public class BlockLocation {
    // 文件块所在的服务器的hostname,因为块有多个副本,所以有多个
    private String[] hosts; 
    // 文件块的所有缓存的hostname
    private String[] cachedHosts; // Datanode hostnames with a cached replica
    // 访问这个文件块的路径(IP:端口号)
    private String[] names; // Datanode IP:xferPort for accessing the block
    // 访问这个文件块的全路径
    private String[] topologyPaths; // Full path name in network topology
    // 文件块所有副本的storageId
    private String[] storageIds; // Storage ID of each replica
    // 文件块所有副本的存储类型
    private StorageType[] storageTypes; // Storage type of each replica
    // 文件块的起始位置偏移量
    private long offset;  // Offset of the block in the file
    // 文件块的长度(字节大小)
    private long length;
    // 是否是脏的文件块
    private boolean corrupt;
}

②开始分片

List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {...}

这里需要注意:分片的单位为一个具体的文件(这个文件在HDFS有可能有很多Block)。

// 文件的总大小
long bytesRemaining = length;
// 按照分片大小,开始makeSplit
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  splits.add(makeSplit(path, length-bytesRemaining, splitSize,
              blkLocations[blkIndex].getHosts(),
              blkLocations[blkIndex].getCachedHosts()));
  bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
  int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
             blkLocations[blkIndex].getHosts(),
             blkLocations[blkIndex].getCachedHosts()));
}

SPLIT_SLOP :常量,值为1.1
bytesRemaining:还未进行分片的总大小
splitSize:分片的大小
bytesRemaining/splitSize > SPLIT_SLOP:如果未分片的大小 / 分片的大小 <= 1.1,那么就把剩余的划为一个分片。
举个栗子:一个129M的文件,在HDFS上会有两个block,一个128M,一个1M。分片大小为128M,那么最后分片的时候只会产生一个分片。

如果文件的大小为0怎么办?

//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));

仍然会创建一个分片,分片长度为0。

总结

1、分片的大小为maxSize,blockSize,minSize三者的中间值。
2、分片是以一个具体的文件为单位的,也就是说一个逻辑分片不会跨越两个文件的block。
3、同一个文件的分片,有可能跨越这个文件的两个block。
4、为了提高Map任务的数据本地性,应使InputSplit大小与block大小相同。

上一篇下一篇

猜你喜欢

热点阅读