HDFS架构师3.1-元数据管理流程2(日志同步及fsimage
19} 20} 21} 22}
元数据源码剖析.png1、内存里面的元数据刷盘
image.png19}
接上期
——1.1 .1.1》 FSEditLog.logSync()
——1.1 .1.1.1》 EditLogOutputStream.flush()
▼
//第一次:FileJouranlManager -> EditLogFileOutputStream
//第二次:QuorumJounalManager -> QuorumOutputStream
flushAndSync(durable);
↓ 先看这个
——1.1 .1.1.1》EditLogFileOutputStream# flushAndSync(durable)
//TODO 涮写磁盘
doubleBuf.flushTo(fp);
↓ 再看这个
——1.1 .1.1.2》QuorumOutputStream#flushAndSync
——1.1 .1.1.2.1》AsyncLoggerSet#sendEdits()
//往journalnode去发送日志。
logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
2、内存里面的元数据刷盘和journalnode 19},20} 10分钟
3、standby的 namenode 同步 元数据 by journalnode
21} 40分钟 22} 18 分钟
元数据管理.png/* EditLogTailer是一个后台线程,启动了以后会周期性的去journalnode集群上面去
-
读取元数据日志,然后再把这些元数据日志应用到自己的元数据里面(内存+磁盘)
*/
EditLogTailer类
EditLogTailer.EditLogTailerThread.run()方法
——1》EditLogTailer.EditLogTailerThread.doWork()
//TODO 重要的代码
doTailEdits();
//TODO 每隔60秒 StandByNameNode 去Journalnode获取一下日志
Thread.sleep(sleepTimeMs);——1》EditLogTailer.doTailEdits()
▼
//TODO 加载当前自己的元数据日志
FSImage image = namesystem.getFSImage();
//TODO StandByNamenoe 获取当前的元数据日志的最后一条日志的事务ID是多少
long lastTxnId = image.getLastAppliedTxId();
//这个地方是重要的代码
//需要去journlanode上面去读取元数据
//现在的事务id 1000,所以我去journlanode上面去读取
//日志的时候,只需要去读取 1001后面的日志就可以。
//TODO 设置获取Journalnode获取日志的流
streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
//TODO 去Journalnode加载日志
editsLoaded = image.loadEdits(streams, namesystem);
——1.1》FSImage.loadEdits()
▼
//TODO 加载日志
//1000
//1001
//2000
loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
//TODO 记录最后的一个事务ID
//1000 -> 2000
lastAppliedTxId = loader.getLastAppliedTxId();
——1.1.1》FSEditLogLoader.loadFSEdits()
//TODO 重要代码
long numEdits = loadEditRecords(edits, false,
——1.1.1》FSEditLogLoader.loadEditRecords()
▼
//TODO 把获取到的元数据作用到自己的内存元数据里
long inodeId = applyEditLogOp(op, fsDir, startOpt,
in.getVersion(true), lastInodeId);
——1.1.1.1》FSEditLogLoader.applyEditLogOp()
▼
//TODO 创建目录的日志
case OP_MKDIR: {
//根据匹配规则我们这次的日志
//应该是一个创建目录的日志。
MkdirOp mkdirOp = (MkdirOp)op;
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId);
//TODO 把数据作用于自己的元数据里面。
FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
break;
——1.1.1.1》FSDirMkdirOp.mkdirForEditLog()
//TODO 重要代码
unprotectedMkdir(fsd, inodeId, existing, localName, permissions,
——1.1.1.1》 FSDirMkdirOp.unprotectedMkdir()
▼
//TODO 封装成一个目录
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
//TODO 往文件目录树 该添加目录的地方添加节点
INodesInPath iip = fsd.addLastINode(parent, dir, true);
▲回到 ——1.1.1》FSEditLogLoader.loadEditRecords
——1.1.1》FSEditLogLoader.loadEditRecords()
▼
try {
/**
* 读取元数据日志(到了journalnode)
* 至于是如何读取的,我们等一下。
* 2.7.0
*/
op = in.readOp();
——1.1.1.1》EditLogInputStream.readOp()
——1.1.1.1》EditLogInputStream. nextOp()
↓
——1.1.1.1.1》EditLogFileInputStream. nextOp()
★——1.1.1.1.1》EditLogFileInputStream.nextOpImpl()
▼
//TODO 核心方法
init(true);
——1.1.1.1.1》EditLogFileInputStream.init()
/**
* TODO 这儿使用了装饰模式
*/
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
▲回到 ——1.1.1.1.1》FSEditLogLoader.nextOpImpl
//TODO 通过reader读取日志
op = reader.readOp(skipBrokenEdits); 21} 0:28:
注释: reader 在 init里面初始化 fStream = log.getInputStream();//log是URLLog
回到 ——1.1.1.1.1》EditLogFileInputStream.init()
//所以找URLlog的getInpustream()的方法
fStream = log.getInputStream();
↓
——1.1.1.1.1.1》EditLogFileInputStream.URLLog.getInputStream();
▼
//创建了HttpURLConnetcion
//如果我们这儿发送的是HTTP的请求,读取的Journalndoe那儿的日志
//说明journalndoe启动起来的时候肯定会有一个JournalnodeHttpServer
//NameNode: NameNodeRpcServer NameNodeHttperServer
//DataNode: RpcServer Httpserver
//JournalNode: JournalnodeRpcServer JournalnodeHttpserver
//TODO 真相大白,我们创建了一个HttpURLConnection对象
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
//通过这个对象获取到了输入流
return connection.getInputStream(); 21} 0:30
↓ JournalNode服务器接受读取editlog请求
——1.1.1.1.1.2》 JournalNodeHttpServer.start()
//TODO 绑定了一个servlet /getJournal
httpServer.addInternalServlet("getJournal", "/getJournal",
GetJournalEditServlet.class, true);
//TODO 启动服务
httpServer.start();
——1.1.1.1.1.2.1》 GetJournalEditServlet.doGet()
▼
//TODO journalndoe读取数据流
//就是我们平常普通的操作
editFileIn = new FileInputStream(editFile);
//TODO 流对烤
//editFileIn 这个输入流读取的是journalnode这儿的日志
//response.getOutputStream() 把数据写到这个输出流里面
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile, editFileIn, throttler);
↓ //StandbyCheckpointer类做checkpoint
——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.run()
- 命名空间 = 元数据信息 = 目录树 = fsimage *
- StandbyCheckpointer 是一个运行在standBynamenode上的一个线程。
- 他会周期性的对命名空间做checkpoint的操作(说白了就是把 内存里面目录树的信息持久化到磁盘上面)
- 并且会把这个份数据上传到active namenode(用来替换 active namednoe上面的fsimage)
——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.dowork()
▼
//TODO 每隔60检查以下是否需要做checkpoint
Thread.sleep(checkPeriod);
//TODO checkpoint条件一 数量 10000
//这儿是计算以下,我们上次checkpoint 现在最新的数据差了多少?
//或者说大概的意思就是说我们现在有多少条日志没有checkpoint了。
final long uncheckpointed = countUncheckpointedTxns();
//TODO checkpoint条件二
//当前时间 - 上一次checkpoint的时间。
//说白了这个变量代表的意思就是 已经有多久没有做checkpoint了。
final long secsSinceLast = (now - lastCheckpointTime) / 1000;
//TODO 执行checkpoint
doCheckpoint();
——1.1.1.1.1.2.1.1》StandbyCheckpointer.doCheckpoint()
▼
//TODO 把元数据持久化到磁盘上面
img.saveNamespace(namesystem, imageType, canceler);
//开启了一个异步的线程
ExecutorService executor =
Executors.newSingleThreadExecutor(uploadThreadFactory);
//这个操作就要把刚刚从内存里面的元数据持持久化到磁盘上面的 那个份数据 上传到 active的namenode上面去。
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler);
4、standby的 namenode 发送 fsimage 到 主namenode 22} 0:13
流程图
——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImageFromStorage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.writeFileToPutRequest()
▼
//通过http方式获取的流
OutputStream output = connection.getOutputStream();
//输入流肯定是自己这儿的,不断读自己的数据
FileInputStream input = new FileInputStream(imageFile);
try {
//这儿没有什么特别的,就是一个流对烤
//然后把数据网output 输出流里面去写。
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf), canceler);
↓ //NameNodeHttpServer类做上传 22}
——1.1.1.1.1.2.1.1.2》NameNodeHttpServer.start()
——1.1.1.1.1.2.1.1.2》NameNodeHttpServer. setupServlets()
▼
//TODO 上传元数据的请求
//SecondaryNameNode/StandByNamenode合并出来的FSImage需要替换Active NameNode的fsimage
//发送的就是http的请求,请求就会转发给这个servlet
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
Image'Servlet.class, true);
——1.1.1.1.1.2.1.1.3》ImageServlet.doPut()
▼
//TODO 步骤一:
// 针对请求获取到一个输入流,不断的把数据读取过来
InputStream stream = request.getInputStream();
try {
long start = monotonicNow();
//TODO 步骤二:
MD5Hash downloadImageDigest = TransferFsImage
.handleUploadImageRequest(request, txid,
nnImage.getStorage(), stream,
parsedParams.getFileSize(), getThrottler(conf));
//TODO 步骤三:
// 会把接收过来的元数据 替换 现在已有的fsimage文件。
//对文件进行重命名
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);