hadoop

HDFS架构师 2-1 —DataNode启动流程-注册及心跳

2021-01-07  本文已影响0人  fat32jin

1、 DataNode启动流程 19:00

DataNode.png

本节入口:
DataNode类main方法
/* DataNode类注释说明:

DataNode#main()
——1 》DataNode#secureMain()
——1 .1》DataNode#createDataNode
——1.1.1 》DataNode#instantiateDataNode
——1.1.1.1 》DataNode#makeInstance
——1.1.1.1.1 》DataNode#DataNode构造函数
——1.1.1.1.1.1》DataNode#startDataNode

//TODO 初始化DataXceiverServer
initDataXceiver(conf);

              ——1.1.1.1.1.1.1》DataNode#initDataXceiver
                                                     ▼
              //重要的代码
            //TODO 实例化了一个DataXceiverServer
            //这个东西就是DataNode用来接收客户端和其他DataNode传过来数据的服务。
            xserver = new DataXceiverServer(tcpPeerServer, conf, this);

                 //设置为后台线程
            this.dataXceiverServer = new Daemon(threadGroup, xserver);    
                     ▲返回——1.1.1.1.1.1》#startDataNode

                    //TODO 启动HttpServer服务
                 startInfoServer(conf);
               //TODO 初始化RPC的服务
               initIpcServer(conf);

          ——1.1.1.1.1.1.2》DataNode#initIpcServer()
                                                     ▼
               //这个代码就是用来创建一个RPC的服务端  jps
              ipcServer = new RPC.Builder(conf)
                 .setProtocol(ClientDatanodeProtocolPB.class)
                  .setInstance(service)
                                ▲返回——1.1.1.1.1.1》#startDataNode

//TODO 创建了BlockPoolManager
//BlockPool,一个集群就有一个BlockPool
//如果我们是联邦机制。就会有多个namenode,也就会有多个联邦,一个联邦就是一个blockpool
//假设一个集群里面:4个NameNode: 2两个联邦
//联邦一:hadoop1(Active) hadoop2(StandBy)(blockPool是同一个)
//联邦二:hadoop3(Active) hadoop4(StandBy)(blockPool是同一个)
blockPoolManager = new BlockPoolManager(this);

//TODO 重要
//里面涉及到心跳内容
blockPoolManager.refreshNamenodes(conf);

          ——1.1.1.1.1.1.3》BlockPoolManager#refreshNamenodes()
                                                     ▼

//TODO 通常情况下:HDFS集群的架构是HA架构
//如果是联邦架构,里面就会有多个
/*
* hadoop1,hadoop2 -> 联邦1 service1
* hadoop3,hadoop4 -> 联邦2 service2
*/
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
//TODO toAdd里面有多少有的联邦,一个联邦就是一个NameService
toAdd.add(nameserviceId);
}
}

    //TODO 遍历所有的联邦,一个联邦里面会有两个NameNode(HA) 
    //如果是2个联邦,那么这个地方就会有两个值
    //BPOfferService -》 一个联邦
    for (String nsToAdd : toAdd) {
      ArrayList<InetSocketAddress> addrs =
              //如果里面做两个高可用,hdoop1,hadoop2
        Lists.newArrayList(addrMap.get(nsToAdd).values());

      //TODO 重要的关系
      //一个联邦对应一个BPOfferService
      //一个联邦里面的一个NameNode就是一个BPServiceActor
      //也就是正常来说一个BPOfferService对应两个BPServiceActor
      //hdfs-site.xml core-site.xml
      BPOfferService bpos = createBPOS(addrs);

//TODO DataNode向NameNode进行注册和心跳
  startAll();

-------------接下来为本节核心内容注册--------------------

                  ——1.1.1.1.1.1.3.1》BlockPoolManager#startAll()
                                                                ▼
//TODO 遍历所有的BPOfferService 遍历所有的联邦
          for (BPOfferService bpos : offerServices) {
            //TODO 重要
            bpos.start();
                      ——1.1.1.1.1.1.3.1》BPOfferService.start();
                         ——1.1.1.1.1.1.3.1.1》BPServiceActor.start()
                            ——1.1.1.1.1.1.3.1.1》BPServiceActor.run()
                                                                          ▼
                                             //TODO 注册核心代码
                                           //这个方法比较重要,我们尽量保证能完成。
                                               connectToNNAndHandshake();

——1.1.1.1.1.1.3.1.1.1》BPServiceActor.connectToNNAndHandshake()
——1.1.1.1.1.1.3.1.1.1》BPServiceActor. register()

//TODO 创建注册信息
bpRegistration = bpos.createRegistration();
//TODO 调用服务端的registerDatanode方法
//bpNamenode RPC的客户端,服务端的代理
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
//如果执行到这儿,说明注册过程已经完成了。
bpRegistration.setNamespaceInfo(nsInfo);

NameNodeRpcServer#registerDatanode()
//TODO 注册DataNode
namesystem.registerDatanode(nodeReg);
——1.1.1.1.1.1.3.1.1.1》FSNamesystem#registerDatanode()
——1.1.1.1.1.1.3.1.1.1.1》DatanodeManager#registerDatanode() 15-27:48

//TODO 注册DataNode
addDatanode(nodeDescr);
——1.1.1.1.1.1.3.1.1.1.1.1》DatanodeManager#addDatanode()

//Todo 注册DataNode说白了就是往一堆数据结构里添加信息
//TODO datanodeMap里面添加数据
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
//TODO 往拓扑的数据结构里面加入一条数据
networktopology.add(node); // may throw InvalidTopologyException
//TODO 往内存里面加入一条数据
host2DatanodeMap.add(node);
//如果以上内存数据结构里面的数据添加好了以后,
//注册就完成了
checkIfClusterIsNowMultiRack(node);
▲返回 上一级——1.1.1.1.1.1.3.1.1.1.1》

//TODO 把注册上来的DataNode加入到HeartbeatManager里面
//后面进行心跳管理
heartbeatManager.addDatanode(nodeDescr);
▲返回 上一级 ——1.1.1.1.1.1.3.1.1》BPServiceActor.run 处
-------------注册步骤完成--------------------

-------------接下来为本节另外一个步骤:心跳 --------------------

2、 DataNode 心跳 16}

——1.1.1.1.1.1.3.1.1》BPServiceActor.run ()
//TODO 发送心跳
offerService();
——1.1.1.1.1.1.3.1.1.1》BPServiceActor.offerService()
//TODO 发送心跳,返回来的是NameNode给的响应指令
HeartbeatResponse resp = sendHeartBeat();
——1.1.1.1.1.1.3.1.1.1.1》BPServiceActorsendHeartBeat()

//TODO 发送心跳
//获取到NameNode的代理,发送心跳
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
↓ 远程RPC代理,接收类方法
——1.1.1.1.1.1.3.1.1.1.1》NameNodeRpcServer#sendHeartbeat()
//TODO 处理DataNode发送过来的心跳
return namesystem.handleHeartbeat(nodeReg, report,
——1.1.1.1.1.1.3.1.1.1.1》FSNamesystem#handleHeartbeat()
//TODO NameNode处理DataNode发送过来的心跳
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
——1.1.1.1.1.1.3.1.1.1.1》DatanodeManager#handleHeartbeat()

//TODO 从已有datanodeMap里面获取到注册过来的DataNode信息
//如果能获取到这个datanode的信息说明以前就注册过了
//但是如果是第一次是那么dataNodemap里面是没有信息的
nodeinfo = getDatanode(nodeReg);

//TODO 更新心跳的重要的信息
heartbeatManager.updateHeartbeat(nodeinfo, reports,

   ——1.1.1.1.1.1.3.1.1.1.1》 HeartbeatManager. updateHeartbeat()
     ——1.1.1.1.1.1.3.1.1.1.1.1》DatanodeDescriptor.updateHeartbeat()
     ——1.1.1.1.1.1.3.1.1.1.1.1》DatanodeDescriptor.updateHeartbeatState()
                                                                            ▼
    //TODO 修改上一次的心跳时间。
     /*
 * datanode1  ->  namenode 心跳  -> 2020:07:03:10:10:16
 * datanode2  ->  namenode 心跳  -> 2020:07:03:10:10:15
* 线程 遍历所有 节点的心跳时间, 用当前时间 减去 上一次的心跳时间 ,如果超过某个值(8秒),就认为这个datanode挂了
 * datanode1  2020:07:03:10:10:20  - 2020:07:03:10:10:10 = 10 > 8    挂了
 * 2020:07:03:10:10:20  -  2020:07:03:10:10:15 = 5
 */
setLastUpdate(Time.now());
setLastUpdateMonotonic(Time.monotonicNow());

*******现在要看当初 Namenode启动时HeartbeatManager类监控方法,
上面的 Datanode 是由此类检查心跳的*********

——1.1.1.1.1.1.3.1.1.1.1.1》HeartbeatManager.Monitor.run()
//TODO 心跳检查
heartbeatCheck();
——1.1.1.1.1.1.3.1.1.1.1.1》HeartbeatManager.Monitor. heartbeatCheck()

//遍历所有的datanodes
/* 注册的时候,就是把datanode的注册的信息放到了这个数据结构里面
* 并且刚刚我们修改这个datanode上一次的心跳信息也是修改的这个数据结构里面的
* datanode的信息。
*/
for (DatanodeDescriptor d : datanodes) {
//这儿就是判断一个datanode是否dead了
//什么情况下,50070界面,或者说namenode就认为datanode dead了?

//这儿就是判断一个datanode是否dead了
      if (dead == null && dm.isDatanodeDead(d)) {

//如果心跳10分钟30秒都没有更新,那么说明这个datanode dead了

------------ ▲ ▲ ▲ ▲心跳检查步骤完成 16} ▲ ▲ ▲ ▲-------------------

上一篇 下一篇

猜你喜欢

热点阅读