【Hadoop 精选】HDFS Client 使用和源码解析
2023-09-26 本文已影响0人
熊本极客
1.HDFS 整体框架图
![](https://img.haomeiwen.com/i21744606/93c7a3f9a6eba01b.png)
2.HDFS Client 读写的 Java 示例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
public class HDFSDemo {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000"); // 设置HDFS的地址
try {
// 创建HDFS文件系统对象
FileSystem fs = FileSystem.get(conf);
// 写入文件
Path filePath = new Path("/path/to/file.txt");
FSDataOutputStream outputStream = fs.create(filePath);
String content = "Hello, HDFS!";
outputStream.writeBytes(content);
outputStream.close();
// 读取文件
FSDataInputStream inputStream = fs.open(filePath);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
String fileContent = new String(buffer, 0, bytesRead);
System.out.println("File content: " + fileContent);
inputStream.close();
// 删除文件
fs.delete(filePath, false);
// 关闭HDFS文件系统对象
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.HDFS Client 源码分析
3.1 Client 初始化
FileSystem
的关键代码:
private FileSystem getInternal(URI uri, Configuration conf, Key key)
throws IOException{
FileSystem fs;
synchronized (this) {
// Cache.map 获取 fs
fs = map.get(key);
}
if (fs != null) {
return fs;
}
// 创建性 fs
fs = createFileSystem(uri, conf);
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
}
// 省略...
// 保存到 Cache.map
fs.key = key;
map.put(key, fs);
// 省略...
}
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
scope.addKVAnnotation("scheme", uri.getScheme());
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
// 反射的方式创建 DistributedFileSystem
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
// DistributedFileSystem.initialize
fs.initialize(uri, conf);
return fs;
}
}
DFSClient
的关键代码:
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats) throws IOException {
// 省略...
ProxyAndInfo<ClientProtocol> proxyInfo = null;
// 省略...
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
// 利用 NameNodeProxiesClient 创建 ProxyAndInfo
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
// 利用 ClientProtocol 类型的 namenode
this.namenode = proxyInfo.getProxy();
}
// 省略...
// sasl client 初始化
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
}
3.2 读数据
- FileSystem 初始化的过程:Client 拿到
NameNodeRpcServer
代理对象,建立与 NameNode 的 RPC 通信 - 调用 FileSystem 的 open() 方法,由于实现类为
DistributedFileSystem
所有是调用该类中的 open() 方法 - DistributedFileSystem 持有
DFSClient
的引用,继续调用DFSClient. open()
方法- 1.实例化
DFSInputStream
输入流即 HdfsDataInputStream - 2.调用
DFSInputStream.openinfo()
方法,接着调用DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength()
方法抓取 block 信息并获取最后 block 长度,再调用DFSClient.getLocatedBlocks()
获取block信息,最后DFSClient.callGetBlockLocations()
方法中通过 NameNode 代理对象调用NameNodeRpcServer.getBlockLocations()
方法
- 1.实例化
- 获取到的 block 信息保存到 DFSInputStream 输入流对象中的成员变量
locatedBlocks
- 交给
HdfsDataInputStream
的基类FSDataInputStream.read
即FSInputStream.read
读取指定 hdfs 文件的内容
DistributedFileSystem
的关键代码:
@Override
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p) throws IOException {
final DFSInputStream dfsis =
// DFSClient. open()
dfs.open(getPathName(p), bufferSize, verifyChecksum);
// 最终返回 HdfsDataInputStream
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
DFSInputStream
的关键代码:
// 从 DFSClient 获取 block 信息并保存在成员变量 locatedBlocks
void openInfo(boolean refreshLocatedBlocks) throws IOException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
// 调用 DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength() 方法抓取 block 信息并获取最后 block 长度,内部调用 DFSClient.getLocatedBlocks() 获取block 信息并保存到成员变量中
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
retriesForLastBlockLength--;
}
if (lastBlockBeingWrittenLength == -1
&& retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
FSInputStream
的关键代码:
@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
ReaderStrategy byteBufferReader =
new ByteBufferStrategy(buf, readStatistics, dfsClient);
return readWithStrategy(byteBufferReader);
}
【参考】
1.Hadoop3.1.1架构体系——设计原理阐述与Client源码图文详解
2.Hadoop3.1.1源码Client详解 : 写入准备-RPC调用与流的建立
3.七、HDFS上传和下载原理(有源码解析)
4.Hadoop源码分析-HDFS写数据源码分析