Hadoop源码分析

Hadoop源码分析-Datanode启动流程源码分析

2021-01-17  本文已影响0人  晨磊的微博

Datanode 启动流程源码分析

本文尝试讲解 Datanode 的启动流程源码分析,内容包括

  1. Datanode 类注释
  2. Datanode 启动流程分析
  3. Datanode 注册流程分析
  4. Datanode 心跳流程分析
  5. Namenode 心跳监控分析

1. Datanode 类注释

DataNode is a class (and program) that stores a set of blocks for a DFS deployment. 
 A single deployment can have one or many DataNodes.  
Each DataNode communicates regularly with a single NameNode.  
It also communicates with client code and other DataNodes from time to time

DataNode 是一个类(进程),它为 DFS 存储block集合。
单个部署可以有一个或多个 DataNode.
每个 DataNode 有规律的与单个 NameNode 通信。
它也会不定时与客户端代码和其他的 DataNode 通信

DataNodes store a series of named blocks.  The DataNode
allows client code to read these blocks, or to write new
block data.  The DataNode may also, in response to instructions
from its NameNode, delete blocks or copy blocks to/from other
DataNodes.

DataNode 存储命名叫 block 的序列。DataNode 允许客户端代码读取这些 block,或者写新的 block 数据。
dataNode 可以相应来自 DataNode 的指令,删除块,或者拷贝块 到/从 其他 DataNode 上

The DataNode maintains just one critical table:
block-> stream of bytes (of BLOCK_SIZE or less)
This info is stored on a local disk.  The DataNode
reports the table's contents to the NameNode upon startup
and every so often afterwards.

DataNode 包含一个表,block-> stream of bytes
这个信息是保存在本地磁盘,DataNode 会报告这个表的内容到 NameNode 在启动的时候,以后每隔一段时间一次

DataNodes spend their lives in an endless loop of asking
the NameNode for something to do.  A NameNode cannot connect
to a DataNode directly; a NameNode simply returns values from
functions invoked by a DataNode.

DataNode 花费一生,无穷尽的循环着,问NameNode 我要做什么。
NameNode 不能直接连接 DataNode;
NameNode 只是在 DataNode 请求的函数中,简单的回复值

DataNodes maintain an open server socket so that client code 
or other DataNodes can read/write data.  The host/port for
this server is reported to the NameNode, which then sends that
information to clients or other DataNodes that might be interested.

DataNode 维护一个开放的服务socket,因此客户端代码或其他的DataNode可以读写数据。
服务的 host/port 会报告给 NameNode,发送信息到 可能感兴趣的 客户端或其他的 DataNode
DataNode 维护开放的服务器套接字,以便客户端代码或其他 DataNode 可以读写数据。
该服务器的主机/端口报告给NameNode,然后NameNode将该信息发送给可能感兴趣的客户端或其他 DataNode。

2. Datanode 启动流程分析

  1. 找到 main 方法,可以看到就一行有用的代码 secureMain(args, null);,直接跟进

    image
  2. 跟进后,发现这个行代码 DataNode datanode = createDataNode(args, null, resources); ,这个应就是创建一个Datanode,直接跟进去看看

    image
  3. 可以看到 DataNode dn = instantiateDataNode(args, conf, resources); ,这个看着像是 实例化 datanode,再跟进

    image
  4. 本方法的前几行都是些判断,直接看最后一行makeInstance(dataLocations, conf, resources);,再跟进

    image
  5. 前面几个都是各种校验,先记录下,然后看最后 new DataNode(conf, locations, resources);

    image
  6. 进入 Datanode 构造方法后,会发现代码比较多,不过前面都是赋值和判断,直接往下看,会看到一句代码就是 startDataNode(conf, dataDirs, resources);,我们进入此方法看看;

    image
  7. startDataNode方法又是很多代码,我们忽略前面的赋值和判断,直接往后看,然后我们发现这里有几行代码storage = new DataStorage(); initDataXceiver(conf);``startInfoServer(conf);,这几行代码虽然看着不像 RPC,但是在这里出现这比较可疑,先简单标记下。然后在往后看就发现一行代码initIpcServer(conf);,这个应就是RPC服务了,进去看看

    image
  8. 果然在里面发现了 RPC.Builder

    image
  9. 我们还发现,从始至终,我们跟踪代码就没离开 Datanode.java,所以 Datanode.java本身应就是RPC服务。我们可以证明下:我们发现 Datanode 实现了很3个接口

    image
    在看接口里又都有 versionID 字段,这就证明Datanode就是RPC服务了
    image
  10. 虽然证明 Datanode 是RPC服务,也找到了其构建的代码,但是 Datanode 在哪里启动的呢?我们可以搜索 ipcServer.start(),因为 RPC服务就是通过 start 方法启动的。

    image
  11. 我们发现是runDatanodeDaemon() 调用了 ipcServer.start() ,我们在找下是哪个方法runDatanodeDaemon(),然后就发现,原来是 createDataNode 方法调用了它,而createDataNode 方法不就是 第 3 步的代码吗

    image

至此 Datanode 启动流程分析完成
简单记录下,Datanode启动过程中遇到的一些可疑的对象

3. Datanode 注册流程分析

第7步哪里是初始化 RPC 服务,后面还有很多代码,我们继续看看有没有什么重要的代码

  1. 我们在 RPC 服务初始化后面看到,又创建了一个 blockPollManager,看下这个对象的注释,然后又调用了blockPoolManager.refreshNamenodes(config)这个代码,这个代码看着不像什么重要代码,但其实这里是非常重要的,Datanode 的注册和心跳都在这里

    代码1
  2. 跟进refreshNamenodes代码看看,发现重要的代码就是最后一行doRefreshNamenodes(newAddressMap);

    代码2
  3. 跟进doRefreshNamenodes(newAddressMap);后发现这个代码不少,这个只能慢慢看了。还好有几个注释。看步骤1,步骤2说是检查和判断类的,就先过了。步骤3 是启动一个 nameservices 服务,这个看着比较重要那就仔细看看。

    代码3
  4. 仔细看看步骤3的代码,发现重要的也就是 for 循环了,这里应该是拿到每一个nameservice的地址,然后去创建了一个 BPOfferService 对象。那我们看看这个createBPOS(addrs); 代码。

    代码4
  5. 这里就一行代码,继续跟进


    代码5
  6. 这里又一个for循环,这个for循环的意思就是循环创建 BPServiceActor对象然后添加到 bpServices 列表中,然后就结束了。也就是说每个 BPOfferService 对象有一个 BPServiceActor 类型的列表;

    代码6
  7. 那么 BPOfferServiceBPServiceActor 到底是什么呢
    这里简单说下,我们都知道 Namenode 有HA 和 联邦,Namenode 联邦中对应有多个HA。其实这里的每个BPServiceActor 就对应一个 Namenode,而BPOfferService对应的就是一个HA集群。在代码 4 中,最后把每个 BPOfferService 又添加到了 offerServices 列表中。然后又来了一个startAll()方法。

    代码4
  8. startAll() 中又是一个循环,可以看出,这里其实就是调用了每个 BPOfferService对象的start方法

    代码8
  9. 然后跟进后,可以看到,最终是调用了BPServiceActorstart()方法

    代码9
  10. 继续跟进后,发现在BPServiceActorstart方法中,new了一个线程,然后又调用了线程的start方法。 既然是调用了线程的start,那么本质上就是调用了线程的run方法。那么我们跟进run方法。

    代码10
  11. 搜索发现run方法,此方法就是个 while循环,不断执行connectToNNAndHandshake();直到成功。 那么继续跟进connectToNNAndHandshake();

    代码11
  12. 先看代码注释,这里首先是获得一个NN代理,然后又获得 namespace 信息,最后进行注册


    代码12
  13. 跟进注册代码,然后可以看到又一个while循环,里面就是注册了。


    代码13
  14. 再次跟进后看到一个RPC代理。那么既然是RPC代理,那我们就该找到它实际的代理协议是哪个了。


    代码14
  15. 其实到这里直接看 NameNodeRpcServer 即可。为什么呢?我们看下此类的注释,大概意思就是该类会把对DatanodeProtocol的请求发送到RPC服务器。而DatanodeProtocol所对应的RPC服务器当然是NameNodeRpcServer 了。

    image
  16. 进入 NameNodeRpcServer 后,找到registerDatanode方法,可以猜到 核心代码就是namesystem.registerDatanode(nodeReg);

    image
  17. 没什么可说的,继续跟进


    image
  18. 进入registerDatanode后,发现代码很多。前面都是跟注册不太相关的,直接到后面看到addDatanode(nodeDescr);。这里既是实际的注册了。

    image
  19. 跟进addDatanode(nodeDescr);代码看看。可以看到,这里其实就是对各种集合赋值。然后就结束了。

    image
    到这里整个Datanode 注册的流程就结束了。我们总结下
    image

4. Datanode 心跳流程分析

  1. 读Datanode心跳代码我们需要回到Datanode的BPServiceActor类的run方法。我们知道了connectToNNAndHandshake();方法是注册,那么在接下来的代码中有一个offerService(),这个方法就是Datanode的心跳。
    image
  2. 进入offerService() 方法后,一下就看到注释说每隔一段时间,会发送心跳和阻塞报告。先看下if判断if (startTime - lastHeartbeat >= dnConf.heartBeatInterval)这里就是判断时间间隔的,那么具体的时间间隔是多少呢?其实是3秒,具体的可以自己在此类中搜索 heartBeatInterval的赋值就可以看到。在往下看几行,就找到了HeartbeatResponse resp = sendHeartBeat(); 这个应该就是发送心跳了。(HeartbeatResponse里还包含Namenode返回的让Datanode执行的命令,这个最后再看)
    image
  3. 好,我们再次进入查看。可以看到前面的几行代码就是获得一些报告信息和概述等,这些信息都被最后一行发送出去了,那就直接跟进最后一行。(这就是为什么我们能在Namenode的50070服务上能看到很多Datanode的信息,心跳的时候带了很多Datanode的信息过去)


    image
  4. 这里可以看到刚开始获得一个builder,然后把这个 builder 传给 resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); 这个代码使用了。这里使用到了 rpcProxy。又是DatanodeProtocolClientSideTranslatorPB这个类,这个类会把请求都转发给RPC服务端。在这里也就是NameNodeRpcServer
    image
  5. 进入NameNodeRpcServer找到sendHeartbeat方法。可以看看最后一行namesystem.handleHeartbeat就是心跳的核心代码
    image
  6. 进入namesystem.handleHeartbeat后,只有blockManager.getDatanodeManager().handleHeartbeat像是处理心跳的代码。直接跟进。
    image
  7. 在这里简单看下,第一个if应该是获得Datanode信息,第二个应该是检测Datanode是否死亡,第三个就是判断如果没有Datanode,或者Datanode已死那么直接返回个其他的命令。在这三个if 之后,终于看到了 heartbeatManager.updateHeartbeat,很明显,这就是跟新心跳吧。
    image
  8. 直接进入 heartbeatManager.updateHeartbeat的代码。又发现一个Update,直接跟进
    image
  9. 还是一个update,在跟


    image
  10. 虽然这里的代码比较多,但是仔细一看,前面大部分都是一些检查。直到出现了几个setXXX的方法。可以看到有一个setLastUpdate(Time.now());方法。看着就像是把当前时间设置为最后更新时间一样。
    image
  11. 跟进一下,就看到这里其实就是一个普通的setter方法,这个是给 lastUpdate 属性赋值。
    image
  12. 后面的代码其实就是更新Datanode报上来的报告


    image

    总结


    image

5. Namenode 监控 Datanode 心跳流程分析

  1. 之前在讲 Namenode 安全模式的代码时,其实后面还有一个重要的代码就是,Namenode启动了几个重要的线程。我们跟进去看看。


    image
  2. 可以看到有 datanodeManager.activate(conf);
    image
  3. 我们跟进heartbeatManager.activate(conf)看看。
    image
  4. 看到调用了线程的start方法,肯定有要看Run方法了


    image
  5. 可以看到每5秒进行一次heartbeatCheck();(时间间隔在配置文件里)
    image
  6. heartbeatCheck(); 每个5秒遍历Datanode,判断最后心跳时间是否超时,(超时时间是10分30秒,具体的看配置),如果超时了,那么下面就会进行一些移除datanode信息的操作,如果没有超时的,等待5秒再检查一次。
    image
上一篇下一篇

猜你喜欢

热点阅读