DFSOutputStream 实现(pipeline)

2022-12-19  本文已影响0人  wayyyy
create接口.png

DFSOutputStream 扩展自抽象类FSOutputSummer,FSOutputSummer 在 Outputstream 的基础上提供了写数据并计算校验和的功能,DFSOutputStream.write()方法的实现就继承自FSOutputStream类。

packet

再讲DFSOutputStream之前,我们先要说说packet这个东西,DFSOutputStream中使用Packet类来封装一个数据包。每个数据包中都包含若干个校验块,以及校验块对应的校验和。一个完整的数据包结构如图所示。首先是数据包包头, 记录了数据包的概要属性信息,然后是校验和数据,最后是校验块数据。Packet类提供了writeData()以及writeChecksum()方法向数据块中写入校验块数据以及校验和。

DFSOutputStream构造方法

DFSOutputStream构造方法比较简单,它首先调用私有的构造方法初始化一些属性,并且 对shouldSyncBIock (是否在关闭时将数据块持久化到磁盘)属性赋值。然后调用 computePacketChunkSize()方法确定数据包(packet)大小,同时确定一个数据包当中包含多少个校验块(chunk)。接下来会构造streamer线程,这个streamer线程负责建立数据流管道(pipeline),并将数据包发送到数据流管道中的一个Datanode,最后设置了 favoredNodes字段,确认客户端想要在哪些 Datanode上写入数据块。



write 方法

DFSOutputStream.write()方法可以将指定大小的数据写入数据流内部的一个缓冲区中,写入的数据会被切分成多个数据包,每个数据包又由一组校验块和这组校验块对应的校验和组成,默认数据包大小为65536字节,校验块大小为512字节,每个校验和都是校验块的512 字节数据对应的校验值。这里的数据包大小、校验块大小是在computePacketChunkSize()方法中定义的。

当Client写入的字节流数据达到一个数据包的长度时,DFSOutputStream会构造一个 Packet对象保存这个要发送的数据包。如果当前数据块中的所有数据包都发送完毕了, DFSOutputStream会发送一个空的数据包标识数据块发送完毕。新构造的Packet对象会被放到 DFSOutputStream.dataQueue 队列中,由 DFSOutputStream 的内部线程类 DataStreamer 处理。

DataStreamer 线程会从 dataQueue 中取出Packet对象,发送到数据流管道中的第一个Datanode,发送完毕后,将Packet从dataQueue中移除,放入ackQueue中等待下游节点的确认消息。确认消息是由DataStreamer的内部线程类ResponseProcessor 处理的。ResponseProcessor线程等待下游节点的响应ack,判断ack状态码,如果是失败状态,则记录出错Datanode的索引(errorlndex & restartindex),并设置错误状态位(hasError)。如果 ack状态是成功,则将数据包从ack队列中移除,整个数据包发送过程完成。

image.png

如果在数据块发送过程中出现错误,那所有ackQueue队列中等待确认的Packet都会被重新放回dataQueue队列中重新发送。客户端会执行错误处理流程,将出现错误的Datanode从 数据流管道中删除,然后向Namenode申请新的Datanode重建数据流管道。接着DataStreamer 线程会从dataQueue队列中取出Packet重新发送。


DataStreamer

DataStreamer 首先向Namenode申请一个新的数据块,然后建立写这个数据块的数据流管道(pipeline),最后DataStreamer从dataQueue队列中取岀待发送数据包并通过数据流管道发送给Datanode。每个数据包(packet)都有一个与之相关的序列号, 当一个数据块中所有的数据包都发送完毕,并且获得了 ACK消息后,DataStreamer线程就会将当前数据块的数据流管道关闭。

如果DFSOutputStream中还有数据需要发送,则DataStreamer 会再次向Namenode申请分配新的数据块,并且提交上一个数据块。获取了新分配数据块的位置信息后,DataStreamer会再次建立到新分配数据块的数据流管道,然后发送数据。

数据流管道的状态:

DataStreamer类定义了 nodes、storageTypes、storagcIDs以及stage等字段,用于保存当前数据流管道的状态。同时, DataStreamer还定义了 setPipeline()方法用于更新上述字段。

DataStreamer使用stage字段记录了当前数据流管道的状态,数据流管道的状态定义在BlockConstructionStage类中,有如下几种:

状态转换如图:


pipeline.png

当DFSCIient执行写新文件操作时,数据流管道的初始状态为PIPELINE_SETUP_CREATE;当DFSCIient执行追加写文件操作时,数据流管道的初始状态为 PIPELINE_SETUP_APPEND。

对于写新文件操作,DataStreamer会调用nextBlockOutputStream()方法向Namenode申请 分配新的数据块,然后构造这个新数据块的数据流管道。

对于追加写文件操作,DataStreamer 会调用setupPipelineForAppendOrRecovery()方法打开已有的HDFS文件并返回这个文件最后 一个数据块的位置信息,然后根据最后一个数据块的位置信息初始化数据流管道。

成功构造数据流管道后,DataStreamer会调用initDataStreaming()方法将数据流管道状态改为DATA_STREAMING,并调用setPipeline()记录数据流管道状态,然后就可以通过数据流 管道发送数驱包了。
当数据流管道将当前数据块写满后,会将数据流管道状态设置为PIPELINE_CLOSE,然 后向数据流管道发送一个空的数据包标识数据块已经写完。当DataStreamer确认收到这个空数据包的响应消息后,也就是数据流管道中的所有Datanode都成功地写入了数据块时,会调
endBk)ck()方法关闭数据流管道,并将数据流管道状态设置为PIPELINE_SETUP_CREATE 初始状态。然后DataStreamer会申请新的数据块,建立数据流管道,并写入数据,直到 DataStreamer线程被关闭。

数据流管道的建立

DataStreamer在将数据包发送到(Namespace)中分配数据块,建立写数据块的数据流管道。这些操作都是在DataStreamer.run() 方法中触发的。

首先调用nextBlockOutputStream。方法向Namenode申请分配新的数据块,然后建立到新分配数据块的输出流。接下来调用setPipeline()方法记录数据流管道信息(包括存 储数据的 Datanode,以及它们的 storagelDs)最后调用initDataStreaming。启动 ResponseProcessor线程处理来自Datanode的响应信息,并将数据块构建状态(BlockConstructionStage) 设置为DATA_STREAMING

数据流管道的建立流程如图所示。

错误处理

DataStreamer错误处理部分,主要是由processDatanodeError()方法实现的。在DataStreamer发送数据包的过程中,由hasErrorerrorindexrestartingNodelndex 这三个变量 记录错误信息,它们分别表明数据流管道是否出现错误、数据流管道中错误的Datanode索引、 数据流管道中需要重启的Datanode索引。

在发送数据包的过程中,可能出现如下错误:

出现错误之后的错误处理是由processDatanodeErrorf)方法实现的,它的流程可以分为以 下三个部分。

  1. 关闭当前10流。
  2. 将ackQueue队列中的元素移动到dataQueue中重新发送。
  3. 重新初始化数据流管道。

在满足什么样的条件下,会用新的Datanode替换数据流管道中异常的Datanode呢?
根据配置,当当前数据节点的数目小于所需要的副本数目除2,以及在appencVhflushed操作下, 当前数据节点的数目小于所需要的副本数目时,调用addDatanode2ExistingPipeline()方法将新 的数据节点添加到当前的数据流管道中。

ResponseProcessor线程

ResponseProcessor线程的处理逻辑比较简单,它从数据流管道下游节点的输入流中读入响应消息。然后判断响应状态,如果下游数据节点执行写入数据包失败,则通过ack消息中的应答码记录错误节点(errorlndex ),并设置错误标志位(hashError )。最后会在
DataStreamer.run()方法中调用processDatanodeEroor()理这个错误信息。如果下游节点写入数据包成功,则把当前数据包信息从ackQueue中移除。

上一篇 下一篇

猜你喜欢

热点阅读