hdfs读之block读取解析<一>
2019-08-15 本文已影响37人
古语1
一、hdfs读取流程
- 先获取文件流
FSDataInputStream fsIn = FileSystem.open("path") - 然后读取文件内容
fsIn.read(buf, off, toRead) -
流程图如下
image.png
二、hdfs客户端打开文件流过程
1. 打开文件流FileSystem.open
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
*/
public FSDataInputStream open(Path f) throws IOException {
//该方法被DistributedFileSystem.open实现了
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
2.继承了FileSystem的 DistributedFileSystem.open
@Override
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
//该方法调用DFSClient的open
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
3. DFSClient.open如下
/**
* Create an input stream that obtains a nodelist from the
* namenode, and then reads from all the right places. Creates
* inner subclass of InputStream that does the right out-of-band
* work.
*/
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException, UnresolvedLinkException {
//检查客户端读取文件是否关闭
checkOpen();
// Get block info from namenode
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
try {
//返回构造方法,下面到这个构造方法看看逻辑
return new DFSInputStream(this, src, verifyChecksum);
} finally {
scope.close();
}
}
4. DFSInputStream 构造方法
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
/**
* 读缓存策略,readDropBehind和readahead两个参数控制读缓存策略,数据读取通常为磁盘操作,每次read将会读取一页数据(512b或者更大),这些数据加载到内存并传输给Client。
* readDropBehind表示读后即弃,即数据读取后立即丢弃cache数据,这可以在多用户并发文件读取时有效节约内存,不过会导致更频繁的磁盘操作,
* 如果关闭此特性,read操作后数据会被cache在内存,对于同一个文件的多次读取可以有效的提升性能,但会消耗更多内存。readahead为预读,
* 如果开启,那么Datanode将会在一次磁盘读取操作中向前额外的多读取一定字节的数据,在线性读取时,这可以有效降低IO操作延迟。
* 这个特性需要在Datanode上开启Native libaries,否则不会生效
*/
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
//读取block信息
openInfo();
}
5. DFSInputStream.openInfo()
void openInfo() throws IOException, UnresolvedLinkException {
synchronized(infoLock) {
//读取block块信息并且获得最后一个block块的长度
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
//默认三次,dfs.client.retry.times.get-last-block-length
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
//默认4000毫秒,dfs.client.retry.interval-ms.get-last-block-length
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else {
break;
}
retriesForLastBlockLength--;
}
if (retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
6. DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength()
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
//客户端向namenode请求获取block信息
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
if (locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
// if the length is zero, then no data has been written to
// datanode. So no need to wait for the locations.
return 0;
}
return -1;
}
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
return lastBlockBeingWrittenLength;
}
7. 获取block信息DFSClient.getLocatedBlocks()
public LocatedBlocks getLocatedBlocks(String src, long start)
throws IOException {
// DFSClient.callGetBlockLocations调用
return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
}
/*
* This is just a wrapper around callGetBlockLocations, but non-static so that
* we can stub it out for tests.
*/
@VisibleForTesting
public LocatedBlocks getLocatedBlocks(String src, long start, long length)
throws IOException {
TraceScope scope = getPathTraceScope("getBlockLocations", src);
try {
// 远程RPC调用namenode server
return callGetBlockLocations(namenode, src, start, length);
} finally {
scope.close();
}
}
8. 远程RPC调用DFSClient.callGetBlockLocations()
/**
* @see ClientProtocol#getBlockLocations(String, long, long)
*/
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length)
throws IOException {
try {
//通过ClientProtocol(ClientNamenodeProtocolTranslatorPB)的协议向namenode请求
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
}
}
9. RPC NN ClientNamenodeProtocolTranslatorPB.getBlockLocations()
@Override
public LocatedBlocks getBlockLocations(String src, long offset, long length)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
.newBuilder()
.setSrc(src)
.setOffset(offset)
.setLength(length)
.build();
try {
// rpc调用暂时不分析了
//调用NameNodeRpcServer.getBlockLocations
//rpcProxy: localhost/127.0.0.1:51397, ProtobufRpcEngine, ClientNamenodeProtocolPB
GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null, req);
return resp.hasLocations() ? PBHelper.convert(resp.getLocations()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
三、hdfs namenode服务端获取block信息
1. namenode获取block信息NameNodeRpcServer.getBlockLocations()
public LocatedBlocks getBlockLocations(String src, long offset, long length)
throws IOException {
checkNNStartup();
metrics.incrGetBlockLocations();
//调用FSNamesystem.getBlockLocations()
return namesystem.getBlockLocations(getClientMachine(), src, offset, length);
}
2. 获取排序后block FSNamesystem.getBlockLocations()
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
*/
LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
long offset, long length) throws IOException {
checkOperation(OperationCategory.READ);
GetBlockLocationsResult res = null;
FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
//获取指定区间的block
res = getBlockLocations(pc, srcArg, offset, length, true, true);
} catch (AccessControlException e) {
logAuditEvent(false, "open", srcArg);
throw e;
} finally {
readUnlock();
}
logAuditEvent(true, "open", srcArg);
if (res.updateAccessTime()) {
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
srcArg);
String src = srcArg;
writeLock();
final long now = now();
try {
checkOperation(OperationCategory.WRITE);
/**
* Resolve the path again and update the atime only when the file
* exists.
*
* XXX: Races can still occur even after resolving the path again.
* For example:
*
* <ul>
* <li>Get the block location for "/a/b"</li>
* <li>Rename "/a/b" to "/c/b"</li>
* <li>The second resolution still points to "/a/b", which is
* wrong.</li>
* </ul>
*
* The behavior is incorrect but consistent with the one before
* HDFS-7463. A better fix is to change the edit log of SetTime to
* use inode id instead of a path.
*/
src = dir.resolvePath(pc, srcArg, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
INode inode = iip.getLastINode();
boolean updateAccessTime = inode != null &&
now > inode.getAccessTime() + getAccessTimePrecision();
if (!isInSafeMode() && updateAccessTime) {
boolean changed = FSDirAttrOp.setTimes(dir,
inode, -1, now, false, iip.getLatestSnapshotId());
if (changed) {
getEditLog().logTimes(src, -1, now);
}
}
} catch (Throwable e) {
LOG.warn("Failed to update the access time of " + src, e);
} finally {
writeUnlock();
}
}
LocatedBlocks blocks = res.blocks;
// 对block副本所在的datanode节点按照到client的网络拓扑距离排序
if (blocks != null) {
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
if (lastBlock != null) {
ArrayList<LocatedBlock> lastBlockList = Lists.newArrayList(lastBlock);
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, lastBlockList);
}
}
//将排序后的blocks返回客户端
return blocks;
}
3. 获取blocks FSNamesystem.getBlockLocations()
/**
* Get block locations within the specified range.
* @see ClientProtocol#getBlockLocations(String, long, long)
* @throws IOException
*/
GetBlockLocationsResult getBlockLocations(
FSPermissionChecker pc, String src, long offset, long length,
boolean needBlockToken, boolean checkSafeMode) throws IOException {
if (offset < 0) {
throw new HadoopIllegalArgumentException(
"Negative offset is not supported. File: " + src);
}
if (length < 0) {
throw new HadoopIllegalArgumentException(
"Negative length is not supported. File: " + src);
}
final GetBlockLocationsResult ret = getBlockLocationsInt(
pc, src, offset, length, needBlockToken);
if (checkSafeMode && isInSafeMode()) {
for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
// if safemode & no block locations yet then throw safemodeException
if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
SafeModeException se = new SafeModeException(
"Zero blocklocations for " + src, safeMode);
if (haEnabled && haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
throw new RetriableException(se);
} else {
throw se;
}
}
}
}
return ret;
}
4. 最后调用FSNamesystem.getBlockLocationsInt()
private GetBlockLocationsResult getBlockLocationsInt(
FSPermissionChecker pc, final String srcArg, long offset, long length,
boolean needBlockToken)
throws IOException {
String src = srcArg;
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = dir.resolvePath(pc, srcArg, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
// 将path解析成INodeFile
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.READ);
checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
}
// 计算文件长度
final long fileSize = iip.isSnapshot() ? inode.computeFileSize(iip.getPathSnapshotId()) : inode.computeFileSizeNotIncludingLastUcBlock();
boolean isUc = inode.isUnderConstruction();
if (iip.isSnapshot()) {
// if src indicates a snapshot file, we need to make sure the returned
// blocks do not exceed the size of the snapshot file.
length = Math.min(length, fileSize - offset);
isUc = false;
}
final FileEncryptionInfo feInfo =
FSDirectory.isReservedRawName(srcArg) ? null
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
//通过blockManager创建LocatedBlocks
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize,
isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
cacheManager.setCachedLocations(lb);
}
final long now = now();
boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
&& !iip.isSnapshot()
&& now > inode.getAccessTime() + getAccessTimePrecision();
return new GetBlockLocationsResult(updateAccessTime, blocks);
}
四、 BlockManager创建LocatedBlocks
1. 获取locaktedBolocks方法BlockManager.createLocatedBlocks()
/** Create a LocatedBlocks. */
public LocatedBlocks createLocatedBlocks(final BlockInfoContiguous[] blocks,
final long fileSizeExcludeBlocksUnderConstruction,
final boolean isFileUnderConstruction, final long offset,
final long length, final boolean needBlockToken,
final boolean inSnapshot, FileEncryptionInfo feInfo)
throws IOException {
assert namesystem.hasReadLock();
if (blocks == null) {
return null;
} else if (blocks.length == 0) {
return new LocatedBlocks(0, isFileUnderConstruction,
Collections.<LocatedBlock>emptyList(), null, false, feInfo);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
}
final AccessMode mode = needBlockToken? AccessMode.READ: null;
//根据blocks获取LocatedBlock集合
//createLocatedBlockList也调用了createLocatedBlock方法
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
blocks, offset, length, Integer.MAX_VALUE, mode);
final LocatedBlock lastlb;
final boolean isComplete;
if (!inSnapshot) {
final BlockInfoContiguous last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
lastlb = createLocatedBlock(last, lastPos, mode);
isComplete = last.isComplete();
} else {
lastlb = createLocatedBlock(blocks,
fileSizeExcludeBlocksUnderConstruction, mode);
isComplete = true;
}
return new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
locatedblocks, lastlb, isComplete, feInfo);
}
}
2. 最终获取block信息BlockManager.createLocatedBlock()
/** @return a LocatedBlock for the given block */
private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos
) throws IOException {
if (blk instanceof BlockInfoContiguousUnderConstruction) {
if (blk.isComplete()) {
throw new IOException(
"blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
+ ", blk=" + blk);
}
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction) blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, storages, pos, false);
}
// get block locations
// 计算blk但无法读取该Block的Datanode节点数
final int numCorruptNodes = countNodes(blk).corruptReplicas();
// 计算FSNamesystem在内存中维护的Block=>Datanode映射的列表中,无法读取该Block的Datanode节点数
// corruptReplicasMap存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合
//计算blk邻近信息块损坏的副本个数,正常情况下和numCorruptNodes一个相等
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for "
+ blk + " blockMap has " + numCorruptNodes
+ " but corrupt replicas map has " + numCorruptReplicas);
}
// 获取Block所在位置(Datanode节点)
// 计算文件blk邻近信息块存储在哪些Datanode节点上
final int numNodes = blocksMap.numNodes(blk);
// 如果损坏的数和副本数一样,则标识此block为坏的block
final boolean isCorrupt = numCorruptNodes == numNodes;
// 如果isCorrupt是true,则返回所有Datanode节点,否则,只返回可用的Block副本所在的Datanode节点
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0;
if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!replicaCorrupt))
machines[j++] = storage;
}
}
assert j == machines.length :
"isCorrupt: " + isCorrupt +
" numMachines: " + numMachines +
" numNodes: " + numNodes +
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
//返回blk和datanode实例化的LocatedBlock
return new LocatedBlock(eb, machines, pos, isCorrupt);
}