切片逻辑之CombineFileInputFormat --
2019-03-08 本文已影响0人
苏坡闷
将文件按照maxSplitSize进行逻辑切片,划分为若干部分!(在minSplitSizeNode(同一节点的数据块)=0&minSplitSizeRac(同一机架的数据块)=0的情况下)
判断待切部分是否 < maxSplitSize,如果小于整个作为 1 part
maxSplitSize <待切部分 < maxSplitSize * 2, 将待切部分均分为2 part
待切部分 > maxSplitSize * 2, 先切出 maxSplitSize作为一部分,再循环判断!
将多个part进行组合,只要大小超过maxSplitSize,这些part就作为1片!
如果minSplitSizeNode&minSplitSizeRack不为零,则还要另行做判断
1.getSplit()
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
Configuration conf = job.getConfiguration();
// 通过setxxxSplitSize()方法设置的参数值会覆盖掉从配置文件中读取的参数值
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
} else {
//如果maxSize没有配置,整个Node生成一个Split
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
throw new IOException("Minimum split size per rack " + minSizeRack +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node " + minSizeNode +
" cannot be larger than minimum split " +
"size per rack " + minSizeRack);
}
//获取输入路径中的所有文件
List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
if (stats.size() == 0) {
return splits;
}
// 迭代为每个过滤池中的文件生成切片
//一个切片中的数据块只可能来自于同一个过滤池,但可以来自同一个过滤池中的不同文件
for (MultiPathFilter onepool : pools) {
ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
//获取满足当前过滤池实例onepool的所有文件myPaths
for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
FileStatus p = iter.next();
if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
iter.remove();
}
}
//为mypaths中的文件生成切片
getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
//为不属于任何过滤池的文件生成切片
getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
//free up rackToNodes map
rackToNodes.clear();
return splits;
}
2.源码:getMoreSplits()
无论是满足某过滤池实例 onePool 条件的文件,还是不属于任何过滤池的文件,可以笼统地理解为 "一批文件",getMoreSplits()就是为这一批文件生成切片的。
/**
* Return all the splits in the specified set of paths
*/
private void getMoreSplits(JobContext job, List<FileStatus> stats,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
Configuration conf = job.getConfiguration();
//OneFileInfo类:代表一个文件
OneFileInfo[] files;
//rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();
//blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
//nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
files = new OneFileInfo[stats.size()];
if (stats.size() == 0) {
return;
}
/**
* 迭代这"一批文件",为每一个文件构建OneFileInfo对象
* OneFileInfo对象在构建过程中维护了上述三个对应关系的信息。
* 迭代完成之后,即可以认为数据块、节点、机架相互之间的对应关系已经建立完毕
* 接下来可以根据这些信息生成切片
*/
long totLength = 0;
int i = 0;
for (FileStatus stat : stats) {
files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
totLength += files[i].getLength();
}
//切片的形成过程
createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
maxSize, minSizeNode, minSizeRack, splits);
}
3.源码:createSplits()
@VisibleForTesting
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<OneBlockInfo, String[]> blockToNodes,
Map<String, List<OneBlockInfo>> rackToBlocks,
long totLength,
long maxSize,
long minSizeNode,
long minSizeRack,
List<InputSplit> splits
) {
//保存当前切片所包含的数据块
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
//保存当前切片的大小
long curSplitSize = 0;
int totalNodes = nodeToBlocks.size();
long totalLength = totLength;
Multiset<String> splitsPerNode = HashMultiset.create();
Set<String> completedNodes = new HashSet<String>();
while(true) {
// it is allowed for maxSize to be 0. Disable smoothing load for such cases
//逐个节点(数据块)形成切片
// process all nodes and create splits that are local to a node. Generate
// one split per node iteration, and walk over nodes multiple times to
// distribute the splits across nodes.
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
String node = one.getKey();
// Skip the node if it has previously been marked as completed.
if (completedNodes.contains(node)) {
continue;
}
Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
while (oneBlockIter.hasNext()) {
OneBlockInfo oneblock = oneBlockIter.next();
// Remove all blocks which may already have been assigned to other
// splits.
if(!blockToNodes.containsKey(oneblock)) {
oneBlockIter.remove();
continue;
}
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
//如果数据块累积大小大于或等于maxSize,则形成一个切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
curSplitSize = 0;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk these
// again.
blocksInCurrentNode.removeAll(validBlocks);
validBlocks.clear();
// Done creating a single split for this node. Move on to the next
// node so that splits are distributed across nodes.
break;
}
}
if (validBlocks.size() != 0) {
// This implies that the last few blocks (or all in case maxSize=0)
// were not part of a split. The node is complete.
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
// This condition also kicks in when max split size is not set. All
// blocks on a node will be grouped together into a single split.
// 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
// 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理
if (minSizeNode != 0 && curSplitSize >= minSizeNode
&& splitsPerNode.count(node) == 0) {
// haven't created any split on this machine. so its ok to add a
// smaller one for parallelism. Otherwise group it in the rack for
// balanced size create an input split and add it to the splits
// array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk this again.
blocksInCurrentNode.removeAll(validBlocks);
// The node is done. This was the last set of blocks for this node.
} else {
// Put the unplaced blocks back into the pool for later rack-allocation.
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
curSplitSize = 0;
completedNodes.add(node);
} else { // No in-flight blocks.
if (blocksInCurrentNode.size() == 0) {
// Node is done. All blocks were fit into node-local splits.
completedNodes.add(node);
} // else Run through the node again.
}
}
// Check if node-local assignments are complete.
if (completedNodes.size() == totalNodes || totalLength == 0) {
// All nodes have been walked over and marked as completed or all blocks
// have been assigned. The rest should be handled via rackLock assignment.
LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
+ completedNodes.size() + ", size left: " + totalLength);
break;
}
}
//逐个机架(数据块)形成切片
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
//overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
// Create one split for this rack before moving over to the next rack.
// Come back to this rack after creating a single split for each of the
// remaining racks.
// Process one rack location at a time, Combine all possible blocks that
// reside on this rack as one split. (constrained by minimum and maximum
// split size).
//依次处理每个机架
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
rackToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
racks.add(one.getKey());
List<OneBlockInfo> blocks = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
boolean createdSplit = false;
//依次处理该机架的每个数据块
for (OneBlockInfo oneblock : blocks) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.如果数据块累积大小大于或等于maxSize,则形成一个切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
createdSplit = true;
break;
}
}
}
// if we created a split, then just go to the next rack
if (createdSplit) {
curSplitSize = 0;
validBlocks.clear();
racks.clear();
continue;
}
if (!validBlocks.isEmpty()) {
//如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片
if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a minimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure
addCreatedSplit(splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
//如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks
overflowBlocks.addAll(validBlocks);
}
}
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
assert blockToNodes.isEmpty();
assert curSplitSize == 0;
assert validBlocks.isEmpty();
assert racks.isEmpty();
//遍历并累加剩余数据块
for (OneBlockInfo oneblock : overflowBlocks) {
validBlocks.add(oneblock);
curSplitSize += oneblock.length;
// This might cause an exiting rack location to be re-added,
// but it should be ok.
for (int i = 0; i < oneblock.racks.length; i++) {
racks.add(oneblock.racks[i]);
}
// if the accumulated split size exceeds the maximum, then
// create this split.
// 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
//剩余数据块形成一个切片
if (!validBlocks.isEmpty()) {
addCreatedSplit(splits, getHosts(racks), validBlocks);
}
}