MapReduce(二):分片
概述
基于Hadoop 2.x
核心方法:org.apache.hadoop.mapreduce.JobSubmitter#writeSplits
org.apache.hadoop.mapreduce.InputFormat 负责创建分片和分片记录读取器
// 创建分片,这个分片只是逻辑上的分片,不包含数据。每个分片都会启动一个Map进程处理。
public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException;
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context ) throws IOException, InterruptedException;
org.apache.hadoop.mapreduce.InputSplit 具体的分片信息
// 当前分片的大小,也就是字节数
public abstract long getLength() throws IOException, InterruptedException;
// 当前分片所属文件的地址,因为文件有多个副本,所以地址有多个
public abstract String[] getLocations() throws IOException, InterruptedException;
// SplitLocationInfo用来描述当前分片所属文件存储状况,是磁盘,还是内存,具体位置在哪。返回null代表文件的所有副本存储在磁盘。
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
具体的分片逻辑
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
1、FileInputFormat中如何确定分片的大小?
// 最小分片大小,默认1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 最大分片大小,默认Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);
// 块大小。文件系统是HDFS的话,就是HDFS块大小。
long blockSize = file.getBlockSize();
// 计算分片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
从上面的逻辑可以看出,分片的大小为:maxSize,blockSize,minSize三者的中间值。
2、怎么分片?
①首先需要知道,被处理的这些文件的状态。
List<FileStatus> files = listStatus(job);
public class FileStatus implements Writable, Comparable<FileStatus> {
// 地址
private Path path;
// 总字节数
private long length;
// 是否是目录
private boolean isdir;
// 副本数量
private short block_replication;
// 文件在文件系统(HDFS)上的块大小
private long blocksize;
private long modification_time;
private long access_time;
private FsPermission permission;
private String owner;
private String group;
private Path symlink;
}
如果是分布式文件系统(HDFS)还有文件的所有块的信息。
public class LocatedFileStatus extends FileStatus {
private BlockLocation[] locations;
}
public class BlockLocation {
// 文件块所在的服务器的hostname,因为块有多个副本,所以有多个
private String[] hosts;
// 文件块的所有缓存的hostname
private String[] cachedHosts; // Datanode hostnames with a cached replica
// 访问这个文件块的路径(IP:端口号)
private String[] names; // Datanode IP:xferPort for accessing the block
// 访问这个文件块的全路径
private String[] topologyPaths; // Full path name in network topology
// 文件块所有副本的storageId
private String[] storageIds; // Storage ID of each replica
// 文件块所有副本的存储类型
private StorageType[] storageTypes; // Storage type of each replica
// 文件块的起始位置偏移量
private long offset; // Offset of the block in the file
// 文件块的长度(字节大小)
private long length;
// 是否是脏的文件块
private boolean corrupt;
}
②开始分片
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {...}
这里需要注意:分片的单位为一个具体的文件(这个文件在HDFS有可能有很多Block)。
// 文件的总大小
long bytesRemaining = length;
// 按照分片大小,开始makeSplit
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
SPLIT_SLOP :常量,值为1.1
bytesRemaining:还未进行分片的总大小
splitSize:分片的大小
bytesRemaining/splitSize > SPLIT_SLOP:如果未分片的大小 / 分片的大小 <= 1.1,那么就把剩余的划为一个分片。
举个栗子:一个129M的文件,在HDFS上会有两个block,一个128M,一个1M。分片大小为128M,那么最后分片的时候只会产生一个分片。
如果文件的大小为0怎么办?
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
仍然会创建一个分片,分片长度为0。
总结
1、分片的大小为maxSize,blockSize,minSize三者的中间值。
2、分片是以一个具体的文件为单位的,也就是说一个逻辑分片不会跨越两个文件的block。
3、同一个文件的分片,有可能跨越这个文件的两个block。
4、为了提高Map任务的数据本地性,应使InputSplit大小与block大小相同。