hadoop

HDFS架构师3.1-元数据管理流程2(日志同步及fsimage

2021-01-11  本文已影响0人  fat32jin

19} 20} 21} 22}

元数据源码剖析.png

1、内存里面的元数据刷盘

image.png

19}
接上期

——1.1 .1.1》 FSEditLog.logSync()
——1.1 .1.1.1》 EditLogOutputStream.flush()

//第一次:FileJouranlManager -> EditLogFileOutputStream
//第二次:QuorumJounalManager -> QuorumOutputStream

        flushAndSync(durable);
                         ↓ 先看这个

——1.1 .1.1.1》EditLogFileOutputStream# flushAndSync(durable)
//TODO 涮写磁盘
doubleBuf.flushTo(fp);
↓ 再看这个
——1.1 .1.1.2》QuorumOutputStream#flushAndSync
——1.1 .1.1.2.1》AsyncLoggerSet#sendEdits()
//往journalnode去发送日志。
logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);

2、内存里面的元数据刷盘和journalnode 19},20} 10分钟

3、standby的 namenode 同步 元数据 by journalnode

21} 40分钟 22} 18 分钟

元数据管理.png

/* EditLogTailer是一个后台线程,启动了以后会周期性的去journalnode集群上面去

//TODO 去Journalnode加载日志
editsLoaded = image.loadEdits(streams, namesystem);

——1.1》FSImage.loadEdits()
                                         ▼

//TODO 加载日志
//1000
//1001
//2000
loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);

//TODO 记录最后的一个事务ID
//1000 -> 2000
lastAppliedTxId = loader.getLastAppliedTxId();

   ——1.1.1》FSEditLogLoader.loadFSEdits()

//TODO 重要代码
long numEdits = loadEditRecords(edits, false,

       ——1.1.1》FSEditLogLoader.loadEditRecords()
                                                         ▼

//TODO 把获取到的元数据作用到自己的内存元数据里
long inodeId = applyEditLogOp(op, fsDir, startOpt,
in.getVersion(true), lastInodeId);

         ——1.1.1.1》FSEditLogLoader.applyEditLogOp()
                                                         ▼

//TODO 创建目录的日志
case OP_MKDIR: {
//根据匹配规则我们这次的日志
//应该是一个创建目录的日志。
MkdirOp mkdirOp = (MkdirOp)op;
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId);
//TODO 把数据作用于自己的元数据里面。
FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
break;

              ——1.1.1.1》FSDirMkdirOp.mkdirForEditLog()
                                  //TODO 重要代码
unprotectedMkdir(fsd, inodeId, existing, localName, permissions, 

                   ——1.1.1.1》 FSDirMkdirOp.unprotectedMkdir()
                                                                ▼

//TODO 封装成一个目录
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
//TODO 往文件目录树 该添加目录的地方添加节点
INodesInPath iip = fsd.addLastINode(parent, dir, true);
▲回到 ——1.1.1》FSEditLogLoader.loadEditRecords

——1.1.1》FSEditLogLoader.loadEditRecords()

try {
/**
* 读取元数据日志(到了journalnode)
* 至于是如何读取的,我们等一下。
* 2.7.0
*/
op = in.readOp();

——1.1.1.1》EditLogInputStream.readOp()
——1.1.1.1》EditLogInputStream. nextOp()

——1.1.1.1.1》EditLogFileInputStream. nextOp()
★——1.1.1.1.1》EditLogFileInputStream.nextOpImpl()

//TODO 核心方法
init(true);
——1.1.1.1.1》EditLogFileInputStream.init()
/**
* TODO 这儿使用了装饰模式
*/
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
▲回到 ——1.1.1.1.1》FSEditLogLoader.nextOpImpl
//TODO 通过reader读取日志
op = reader.readOp(skipBrokenEdits); 21} 0:28:

注释: reader 在 init里面初始化 fStream = log.getInputStream();//log是URLLog
回到 ——1.1.1.1.1》EditLogFileInputStream.init()
//所以找URLlog的getInpustream()的方法
fStream = log.getInputStream();

——1.1.1.1.1.1》EditLogFileInputStream.URLLog.getInputStream();

//创建了HttpURLConnetcion
//如果我们这儿发送的是HTTP的请求,读取的Journalndoe那儿的日志
//说明journalndoe启动起来的时候肯定会有一个JournalnodeHttpServer
//NameNode: NameNodeRpcServer NameNodeHttperServer
//DataNode: RpcServer Httpserver
//JournalNode: JournalnodeRpcServer JournalnodeHttpserver
//TODO 真相大白,我们创建了一个HttpURLConnection对象
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
//通过这个对象获取到了输入流
return connection.getInputStream(); 21} 0:30

                                       ↓    JournalNode服务器接受读取editlog请求 

       ——1.1.1.1.1.2》 JournalNodeHttpServer.start()

//TODO 绑定了一个servlet /getJournal
httpServer.addInternalServlet("getJournal", "/getJournal",
GetJournalEditServlet.class, true);
//TODO 启动服务
httpServer.start();

——1.1.1.1.1.2.1》 GetJournalEditServlet.doGet()

//TODO journalndoe读取数据流
//就是我们平常普通的操作
editFileIn = new FileInputStream(editFile);

              //TODO 流对烤
              //editFileIn 这个输入流读取的是journalnode这儿的日志
              //response.getOutputStream() 把数据写到这个输出流里面             
        TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,  editFileIn, throttler);

                                ↓  //StandbyCheckpointer类做checkpoint

——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.run()

——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.dowork()

//TODO 每隔60检查以下是否需要做checkpoint
Thread.sleep(checkPeriod);
//TODO checkpoint条件一 数量 10000
//这儿是计算以下,我们上次checkpoint 现在最新的数据差了多少?
//或者说大概的意思就是说我们现在有多少条日志没有checkpoint了。
final long uncheckpointed = countUncheckpointedTxns();
//TODO checkpoint条件二
//当前时间 - 上一次checkpoint的时间。
//说白了这个变量代表的意思就是 已经有多久没有做checkpoint了。
final long secsSinceLast = (now - lastCheckpointTime) / 1000;

//TODO 执行checkpoint
doCheckpoint();

——1.1.1.1.1.2.1.1》StandbyCheckpointer.doCheckpoint()

//TODO 把元数据持久化到磁盘上面
img.saveNamespace(namesystem, imageType, canceler);
//开启了一个异步的线程
ExecutorService executor =
Executors.newSingleThreadExecutor(uploadThreadFactory);

//这个操作就要把刚刚从内存里面的元数据持持久化到磁盘上面的 那个份数据 上传到 active的namenode上面去。
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler);

4、standby的 namenode 发送 fsimage 到 主namenode 22} 0:13

流程图

image.png

——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImageFromStorage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.writeFileToPutRequest()

//通过http方式获取的流
OutputStream output = connection.getOutputStream();
//输入流肯定是自己这儿的,不断读自己的数据
FileInputStream input = new FileInputStream(imageFile);
try {
//这儿没有什么特别的,就是一个流对烤
//然后把数据网output 输出流里面去写。
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf), canceler);

                                ↓  //NameNodeHttpServer类做上传   22}

——1.1.1.1.1.2.1.1.2》NameNodeHttpServer.start()
——1.1.1.1.1.2.1.1.2》NameNodeHttpServer. setupServlets()

//TODO 上传元数据的请求
//SecondaryNameNode/StandByNamenode合并出来的FSImage需要替换Active NameNode的fsimage
//发送的就是http的请求,请求就会转发给这个servlet
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
Image'Servlet.class, true);

          ——1.1.1.1.1.2.1.1.3》ImageServlet.doPut()
                                                              ▼   
           //TODO 步骤一:
            // 针对请求获取到一个输入流,不断的把数据读取过来
            InputStream stream = request.getInputStream();
            try {
              long start = monotonicNow();
              //TODO 步骤二:
              MD5Hash downloadImageDigest = TransferFsImage
                  .handleUploadImageRequest(request, txid,
                      nnImage.getStorage(), stream,
                      parsedParams.getFileSize(), getThrottler(conf));
              //TODO 步骤三:
              // 会把接收过来的元数据 替换 现在已有的fsimage文件。
              //对文件进行重命名
              nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
                  downloadImageDigest);
上一篇下一篇

猜你喜欢

热点阅读