HDFS架构师 2-1 —DataNode启动流程-注册及心跳
1、 DataNode启动流程 19:00
DataNode.png
本节入口:
DataNode类main方法
/* DataNode类注释说明:
- TODO (1)
- DataNode存储hdfs上block文件块。在一个文件系统里面可以有多个dataNode
- 每个DataNode周期性的跟NameNode进行通信,客户端也可以跟DataNode进行交互
- 或者DataNode之间也可以进行相互通信。
- TODO (2)
- DataNode存储一系列block,DataNode允许客户端去读写block。
- DataNode也会去响应NameNode,响应NameNode发送过来的一些指令
- 比如:删除block,复制block等操作。
- TODO (3)
- datanode管理了一个重要的表:
- block -》 stream of bytes 看起来像是一些元数据的信息。
- TODO (4)
- 这个信息是存储在本地磁盘,DataNode启动的时候会把这些信息
- 汇报给NameNode,启动了以后也会再去不断的汇报。
- TODO(5)
- DataNode启动了以后会一直去问namenode自己需要干些什么?心跳
- NameNode是不能直接去操作DataNode的。DataNode启动了以后,会跟NameNode
- 进行心跳,NameNode接收到了心跳了以后,如果需要这个DataNode做什么事,
- 就会给DataNode一个返回值(指令),DataNode接收到这些指令以后就知道NameNode
- 想让他做什么事了
- TODO(6)
- DataNode开放了Socket服务,让客户端或者别的DataNode来进行读写数据。
- DataNode启动的时候会把自己的主机名和端口号汇报给NameNode.
- 也就是说如果Client和DataNode想要去访问某个DataNode.首先要跟NameNode进行通信
- 从NameNode那儿获取到目标DataNode的主机名和端口号。
- 这样才可以访问到对应的DataNode了。
- 客户端:hell0.txt -> namenode: block01(hadoop1:50010,hadoop2,hadoop3),block02(hadoop4,hadoopxx)
- 客户端:hadoop1:50010 hadoop4:50010
- TODO 总结:
- 1)一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据的。
- 2)DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报)
- 3)NameNode不能直接操作DataNode.而是通信心跳返回值指令的方式去操作的DataNode.
-
DataNode启动了以后开放了一个socket的服务(RPC),等待别人去调用他。
*/
image.png
-
DataNode#main()
——1 》DataNode#secureMain()
——1 .1》DataNode#createDataNode
——1.1.1 》DataNode#instantiateDataNode
——1.1.1.1 》DataNode#makeInstance
——1.1.1.1.1 》DataNode#DataNode构造函数
——1.1.1.1.1.1》DataNode#startDataNode
▼
//TODO 初始化DataXceiverServer
initDataXceiver(conf);
——1.1.1.1.1.1.1》DataNode#initDataXceiver
▼
//重要的代码
//TODO 实例化了一个DataXceiverServer
//这个东西就是DataNode用来接收客户端和其他DataNode传过来数据的服务。
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
//设置为后台线程
this.dataXceiverServer = new Daemon(threadGroup, xserver);
▲返回——1.1.1.1.1.1》#startDataNode
//TODO 启动HttpServer服务
startInfoServer(conf);
//TODO 初始化RPC的服务
initIpcServer(conf);
——1.1.1.1.1.1.2》DataNode#initIpcServer()
▼
//这个代码就是用来创建一个RPC的服务端 jps
ipcServer = new RPC.Builder(conf)
.setProtocol(ClientDatanodeProtocolPB.class)
.setInstance(service)
▲返回——1.1.1.1.1.1》#startDataNode
//TODO 创建了BlockPoolManager
//BlockPool,一个集群就有一个BlockPool
//如果我们是联邦机制。就会有多个namenode,也就会有多个联邦,一个联邦就是一个blockpool
//假设一个集群里面:4个NameNode: 2两个联邦
//联邦一:hadoop1(Active) hadoop2(StandBy)(blockPool是同一个)
//联邦二:hadoop3(Active) hadoop4(StandBy)(blockPool是同一个)
blockPoolManager = new BlockPoolManager(this);
//TODO 重要
//里面涉及到心跳内容
blockPoolManager.refreshNamenodes(conf);
——1.1.1.1.1.1.3》BlockPoolManager#refreshNamenodes()
▼
//TODO 通常情况下:HDFS集群的架构是HA架构
//如果是联邦架构,里面就会有多个
/*
* hadoop1,hadoop2 -> 联邦1 service1
* hadoop3,hadoop4 -> 联邦2 service2
*/
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
//TODO toAdd里面有多少有的联邦,一个联邦就是一个NameService
toAdd.add(nameserviceId);
}
}
//TODO 遍历所有的联邦,一个联邦里面会有两个NameNode(HA)
//如果是2个联邦,那么这个地方就会有两个值
//BPOfferService -》 一个联邦
for (String nsToAdd : toAdd) {
ArrayList<InetSocketAddress> addrs =
//如果里面做两个高可用,hdoop1,hadoop2
Lists.newArrayList(addrMap.get(nsToAdd).values());
//TODO 重要的关系
//一个联邦对应一个BPOfferService
//一个联邦里面的一个NameNode就是一个BPServiceActor
//也就是正常来说一个BPOfferService对应两个BPServiceActor
//hdfs-site.xml core-site.xml
BPOfferService bpos = createBPOS(addrs);
//TODO DataNode向NameNode进行注册和心跳
startAll();
-------------接下来为本节核心内容注册--------------------
——1.1.1.1.1.1.3.1》BlockPoolManager#startAll()
▼
//TODO 遍历所有的BPOfferService 遍历所有的联邦
for (BPOfferService bpos : offerServices) {
//TODO 重要
bpos.start();
——1.1.1.1.1.1.3.1》BPOfferService.start();
——1.1.1.1.1.1.3.1.1》BPServiceActor.start()
——1.1.1.1.1.1.3.1.1》BPServiceActor.run()
▼
//TODO 注册核心代码
//这个方法比较重要,我们尽量保证能完成。
connectToNNAndHandshake();
——1.1.1.1.1.1.3.1.1.1》BPServiceActor.connectToNNAndHandshake()
——1.1.1.1.1.1.3.1.1.1》BPServiceActor. register()
//TODO 创建注册信息
bpRegistration = bpos.createRegistration();
//TODO 调用服务端的registerDatanode方法
//bpNamenode RPC的客户端,服务端的代理
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
//如果执行到这儿,说明注册过程已经完成了。
bpRegistration.setNamespaceInfo(nsInfo);
↓
NameNodeRpcServer#registerDatanode()
//TODO 注册DataNode
namesystem.registerDatanode(nodeReg);
——1.1.1.1.1.1.3.1.1.1》FSNamesystem#registerDatanode()
——1.1.1.1.1.1.3.1.1.1.1》DatanodeManager#registerDatanode() 15-27:48
//TODO 注册DataNode
addDatanode(nodeDescr);
——1.1.1.1.1.1.3.1.1.1.1.1》DatanodeManager#addDatanode()
▼
//Todo 注册DataNode说白了就是往一堆数据结构里添加信息
//TODO datanodeMap里面添加数据
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
//TODO 往拓扑的数据结构里面加入一条数据
networktopology.add(node); // may throw InvalidTopologyException
//TODO 往内存里面加入一条数据
host2DatanodeMap.add(node);
//如果以上内存数据结构里面的数据添加好了以后,
//注册就完成了
checkIfClusterIsNowMultiRack(node);
▲返回 上一级——1.1.1.1.1.1.3.1.1.1.1》
//TODO 把注册上来的DataNode加入到HeartbeatManager里面
//后面进行心跳管理
heartbeatManager.addDatanode(nodeDescr);
▲返回 上一级 ——1.1.1.1.1.1.3.1.1》BPServiceActor.run 处
-------------注册步骤完成--------------------
-------------接下来为本节另外一个步骤:心跳 --------------------
2、 DataNode 心跳 16}
——1.1.1.1.1.1.3.1.1》BPServiceActor.run ()
//TODO 发送心跳
offerService();
——1.1.1.1.1.1.3.1.1.1》BPServiceActor.offerService()
//TODO 发送心跳,返回来的是NameNode给的响应指令
HeartbeatResponse resp = sendHeartBeat();
——1.1.1.1.1.1.3.1.1.1.1》BPServiceActorsendHeartBeat()
▼
//TODO 发送心跳
//获取到NameNode的代理,发送心跳
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
↓ 远程RPC代理,接收类方法
——1.1.1.1.1.1.3.1.1.1.1》NameNodeRpcServer#sendHeartbeat()
//TODO 处理DataNode发送过来的心跳
return namesystem.handleHeartbeat(nodeReg, report,
——1.1.1.1.1.1.3.1.1.1.1》FSNamesystem#handleHeartbeat()
//TODO NameNode处理DataNode发送过来的心跳
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
——1.1.1.1.1.1.3.1.1.1.1》DatanodeManager#handleHeartbeat()
▼
//TODO 从已有datanodeMap里面获取到注册过来的DataNode信息
//如果能获取到这个datanode的信息说明以前就注册过了
//但是如果是第一次是那么dataNodemap里面是没有信息的
nodeinfo = getDatanode(nodeReg);
//TODO 更新心跳的重要的信息
heartbeatManager.updateHeartbeat(nodeinfo, reports,
——1.1.1.1.1.1.3.1.1.1.1》 HeartbeatManager. updateHeartbeat()
——1.1.1.1.1.1.3.1.1.1.1.1》DatanodeDescriptor.updateHeartbeat()
——1.1.1.1.1.1.3.1.1.1.1.1》DatanodeDescriptor.updateHeartbeatState()
▼
//TODO 修改上一次的心跳时间。
/*
* datanode1 -> namenode 心跳 -> 2020:07:03:10:10:16
* datanode2 -> namenode 心跳 -> 2020:07:03:10:10:15
* 线程 遍历所有 节点的心跳时间, 用当前时间 减去 上一次的心跳时间 ,如果超过某个值(8秒),就认为这个datanode挂了
* datanode1 2020:07:03:10:10:20 - 2020:07:03:10:10:10 = 10 > 8 挂了
* 2020:07:03:10:10:20 - 2020:07:03:10:10:15 = 5
*/
setLastUpdate(Time.now());
setLastUpdateMonotonic(Time.monotonicNow());
*******现在要看当初 Namenode启动时HeartbeatManager类监控方法,
上面的 Datanode 是由此类检查心跳的*********
——1.1.1.1.1.1.3.1.1.1.1.1》HeartbeatManager.Monitor.run()
//TODO 心跳检查
heartbeatCheck();
——1.1.1.1.1.1.3.1.1.1.1.1》HeartbeatManager.Monitor. heartbeatCheck()
▼
//遍历所有的datanodes
/* 注册的时候,就是把datanode的注册的信息放到了这个数据结构里面
* 并且刚刚我们修改这个datanode上一次的心跳信息也是修改的这个数据结构里面的
* datanode的信息。
*/
for (DatanodeDescriptor d : datanodes) {
//这儿就是判断一个datanode是否dead了
//什么情况下,50070界面,或者说namenode就认为datanode dead了?
//这儿就是判断一个datanode是否dead了
if (dead == null && dm.isDatanodeDead(d)) {
//如果心跳10分钟30秒都没有更新,那么说明这个datanode dead了
------------ ▲ ▲ ▲ ▲心跳检查步骤完成 16} ▲ ▲ ▲ ▲-------------------