HDFS架构师3.0-元数据管理流程1(主节点双缓冲写)
1、以创建目录的场景 为例贯穿整个流程
自编客户端 FileTest 代码
_>▲★▼
FileSystem fileSystem=FileSystem.newInstance(configuration);
//场景驱动的方式(元数据的更新流程)
fileSystem.mkdirs(new Path("/usr/hive/warehouse/test/mydata"));
mkdirs 是抽象方法
- The local implementation is {@link LocalFileSystem}
- and distributed implementation is DistributedFileSystem.
FileSystem#mkdirs()
↓
——》DistributedFileSystem#mkdirs()
——》DistributedFileSystem#mkdirsInternal()
//TODO 重要代码
return dfs.mkdirs(getPathName(p), permission, createParent);
——》DFSClient#mkdirs()
—— DFSClient#primitiveMkdir()
//TODO 走的Hadoop的RPC,调用服务端的代码
return namenode.mkdirs(src, absPermission, createParent); 18}0:15
↓
——》 DFSClient#mkdirs()
//TODO 调用FSNameSystem创建目录的方法
return namesystem.mkdirs(src,
——》 FSNamesystem.mkdirs()
——★ 》 FSDirMkdirOp.mkdirs() 0:27
▼
//TODO 解析要创建目录的路径 /user/hive/warehouse/data/mytable
src = fsd.resolvePath(pc, src, pathComponents);
/**
* 比如我们现在已经存在的目录是 /user/hive/warehouse
* 我们需要创建的目录是:/user/hive/warehouse/data/mytable
* 首先找到最后一个INode,其实就是warehouse 这个INode
/
final INode lastINode = iip.getLastINode();
/*
* 已存在: /user/hive/warehouse
* 要创建: /user/hive/warehouse/data/mytable
* 需要创建的目录 /data/mytable
*/
List<String> nonExisting = iip.getPath(existing.length(),
//TODO 需要创建多级目录走这儿
if (length > 1) {
List<String> ancestors = nonExisting.subList(0, length - 1);
//TODO 如果只需要创建一个目录就走这儿
if ((existing = createChildrenDirectories(fsd, existing,
// u+wx permission to all ancestor directories
existing = createChildrenDirectories(fsd, existing, ancestors,
▲
——》 FSDirMkdirOp.createChildrenDirectories() 0:39
//TODO 一个目录一个目录去创建
//如果我们只创建的目录只有一个那么这个循环就只运行一次。
existing = createSingleDirectory(fsd, existing, component, perm);
——1 》 FSDirMkdirOp.createSingleDirectory()
▼
//TODO 更新文件目录树,这棵目录树是存在于内存中的,有FSNameSystem管理的
//更新内存里面的数据
existing = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), existing,
//TODO 把元数据信息记录到磁盘上(但是一开始先写到内存)
//往磁盘上面记录元数据日志
fsd.getEditLog().logMkDir(cur, newNode);
——1.1 》 FSEditLog.logMkDir()
▼
//TODO 创建日志对象 【构建者模式】 注意积累
MkdirOp op = MkdirOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
.setTimestamp(newNode.getModificationTime())
.setPermissionStatus(permissions);
//TODO 记录日志
logEdit(op);
——1.1 .1》 FSEditLog.logEdit()
▼
//TODO 步骤一:获取当前的独一无二的事务ID
long start = beginTransaction();
op.setTransactionId(txid);
//TODO 步骤二:把元数据写入到内存缓冲
//这儿的代码写得很晦涩。
editLogStream.write(op);
// check if it is time to schedule an automatic sync
// 看当前的内存大小是否 >= 512kb = true
// !true = false
//这个条件决定了,两个内存是否交换数据
//如果当前的内存写满了,512kb >= 512 kb 我们这儿就会返回true
// !ture = false
// !false =true
if (!shouldForceSync()) {
//TODO 说明这个条件就是进行元数据持久化的一个关键条件
return;
}
//TODO 如果到这儿就说明 当前的那个缓冲区存满了
isAutoSyncScheduled = true;
} //释放锁
//TODO 把数据持久化到磁盘
logSync();
//交换内存,持久化
▲18} 1:30:00
——1.1 .1.1》 FSEditLog.logSync()