DFSOutputStream 实现(pipeline)
DFSOutputStream 扩展自抽象类FSOutputSummer,FSOutputSummer 在 Outputstream 的基础上提供了写数据并计算校验和的功能,DFSOutputStream.write()
方法的实现就继承自FSOutputStream类。
packet
再讲DFSOutputStream之前,我们先要说说packet这个东西,DFSOutputStream中使用Packet类来封装一个数据包。每个数据包中都包含若干个校验块,以及校验块对应的校验和。一个完整的数据包结构如图所示。首先是数据包包头, 记录了数据包的概要属性信息,然后是校验和数据,最后是校验块数据。Packet类提供了writeData()以及writeChecksum()方法向数据块中写入校验块数据以及校验和。
DFSOutputStream构造方法
DFSOutputStream构造方法比较简单,它首先调用私有的构造方法初始化一些属性,并且 对shouldSyncBIock
(是否在关闭时将数据块持久化到磁盘)属性赋值。然后调用 computePacketChunkSize()
方法确定数据包(packet)大小,同时确定一个数据包当中包含多少个校验块(chunk)。接下来会构造streamer线程,这个streamer线程负责建立数据流管道(pipeline),并将数据包发送到数据流管道中的一个Datanode,最后设置了 favoredNodes字段,确认客户端想要在哪些 Datanode上写入数据块。
write 方法
DFSOutputStream.write()
方法可以将指定大小的数据写入数据流内部的一个缓冲区中,写入的数据会被切分成多个数据包,每个数据包又由一组校验块和这组校验块对应的校验和组成,默认数据包大小为65536字节,校验块大小为512字节,每个校验和都是校验块的512 字节数据对应的校验值。这里的数据包大小、校验块大小是在computePacketChunkSize()
方法中定义的。
当Client写入的字节流数据达到一个数据包的长度时,DFSOutputStream会构造一个 Packet对象保存这个要发送的数据包。如果当前数据块中的所有数据包都发送完毕了, DFSOutputStream会发送一个空的数据包标识数据块发送完毕。新构造的Packet对象会被放到 DFSOutputStream.dataQueue 队列中,由 DFSOutputStream 的内部线程类 DataStreamer 处理。
DataStreamer 线程会从 dataQueue 中取出Packet对象,发送到数据流管道中的第一个Datanode,发送完毕后,将Packet从dataQueue中移除,放入ackQueue中等待下游节点的确认消息。确认消息是由DataStreamer的内部线程类ResponseProcessor 处理的。ResponseProcessor线程等待下游节点的响应ack,判断ack状态码,如果是失败状态,则记录出错Datanode的索引(errorlndex & restartindex),并设置错误状态位(hasError)。如果 ack状态是成功,则将数据包从ack队列中移除,整个数据包发送过程完成。
image.png如果在数据块发送过程中出现错误,那所有ackQueue队列中等待确认的Packet都会被重新放回dataQueue队列中重新发送。客户端会执行错误处理流程,将出现错误的Datanode从 数据流管道中删除,然后向Namenode申请新的Datanode重建数据流管道。接着DataStreamer 线程会从dataQueue队列中取出Packet重新发送。
DataStreamer
DataStreamer 首先向Namenode申请一个新的数据块,然后建立写这个数据块的数据流管道(pipeline),最后DataStreamer从dataQueue队列中取岀待发送数据包并通过数据流管道发送给Datanode。每个数据包(packet)都有一个与之相关的序列号, 当一个数据块中所有的数据包都发送完毕,并且获得了 ACK消息后,DataStreamer线程就会将当前数据块的数据流管道关闭。
如果DFSOutputStream中还有数据需要发送,则DataStreamer 会再次向Namenode申请分配新的数据块,并且提交上一个数据块。获取了新分配数据块的位置信息后,DataStreamer会再次建立到新分配数据块的数据流管道,然后发送数据。
数据流管道的状态:
DataStreamer类定义了 nodes、storageTypes、storagcIDs以及stage等字段,用于保存当前数据流管道的状态。同时, DataStreamer还定义了 setPipeline()
方法用于更新上述字段。
DataStreamer使用stage字段记录了当前数据流管道的状态,数据流管道的状态定义在BlockConstructionStage
类中,有如下几种:
- PIPELINE_SETUP_CREATE:写新文件时,数据流管道的初始状态。
- PIPELINE_SETUP_APPEND:追加写已有文件时,数据流管道的初始状态。
- DATA_STREAMING:数据流管道已经建立好,可以传输数据了。
- PIPELINE_CLOSE:数据块已经写满,数据流管道关闭。
状态转换如图:
pipeline.png
当DFSCIient执行写新文件操作时,数据流管道的初始状态为PIPELINE_SETUP_CREATE;当DFSCIient执行追加写文件操作时,数据流管道的初始状态为 PIPELINE_SETUP_APPEND。
对于写新文件操作,DataStreamer会调用nextBlockOutputStream()
方法向Namenode申请 分配新的数据块,然后构造这个新数据块的数据流管道。
对于追加写文件操作,DataStreamer 会调用setupPipelineForAppendOrRecovery()
方法打开已有的HDFS文件并返回这个文件最后 一个数据块的位置信息,然后根据最后一个数据块的位置信息初始化数据流管道。
成功构造数据流管道后,DataStreamer会调用initDataStreaming()
方法将数据流管道状态改为DATA_STREAMING,并调用setPipeline()记录数据流管道状态,然后就可以通过数据流 管道发送数驱包了。
当数据流管道将当前数据块写满后,会将数据流管道状态设置为PIPELINE_CLOSE,然 后向数据流管道发送一个空的数据包标识数据块已经写完。当DataStreamer确认收到这个空数据包的响应消息后,也就是数据流管道中的所有Datanode都成功地写入了数据块时,会调
用endBk)ck()
方法关闭数据流管道,并将数据流管道状态设置为PIPELINE_SETUP_CREATE 初始状态。然后DataStreamer会申请新的数据块,建立数据流管道,并写入数据,直到 DataStreamer线程被关闭。
数据流管道的建立
DataStreamer在将数据包发送到(Namespace)中分配数据块,建立写数据块的数据流管道。这些操作都是在DataStreamer.run() 方法中触发的。
首先调用nextBlockOutputStream
。方法向Namenode申请分配新的数据块,然后建立到新分配数据块的输出流。接下来调用setPipeline()
方法记录数据流管道信息(包括存 储数据的 Datanode,以及它们的 storagelDs)最后调用initDataStreaming
。启动 ResponseProcessor线程处理来自Datanode的响应信息,并将数据块构建状态(BlockConstructionStage) 设置为DATA_STREAMING
数据流管道的建立流程如图所示。
-
nextBlockOutputStream
-
setPipeline
-
initDataStreaming
错误处理
DataStreamer错误处理部分,主要是由processDatanodeError()
方法实现的。在DataStreamer发送数据包的过程中,由hasError
、errorindex
和 restartingNodelndex
这三个变量 记录错误信息,它们分别表明数据流管道是否出现错误、数据流管道中错误的Datanode索引、 数据流管道中需要重启的Datanode索引。
在发送数据包的过程中,可能出现如下错误:
-
在建立数据流管道的过程中:在createBlockOutputStream()方法中,调用DataTransfer Protocol.writeBlock()请求时,下游Datanode可能出现异常,并随着writeBlock()的响 应带回。在异常处理代码中,对hasError、errorindex和restartingNodelndex这三个 变量赋值。
-
数据包发送完成后:下游节点会对每个数据包进行ack确认。ack确认消息中就会携 带岀现故障的Datanode信息,也就是在ResponseProcessor.run()方法处理ack消息时, 异常处理代码会对hasError、errorindex和restartingNodelndex这三个变量赋值。
-
在DataStreamer.run()中,通过底层IO流发送数据包时会出现异常(这个时候,由于 没有下游节点返回的消息,所以直接将数据管道流中的第一个节点标识为错误节点)。
出现错误之后的错误处理是由processDatanodeErrorf)方法实现的,它的流程可以分为以 下三个部分。
- 关闭当前10流。
- 将ackQueue队列中的元素移动到dataQueue中重新发送。
- 重新初始化数据流管道。
-
setupPipelineForAppendOrRecovery()
主要应用在两种情况下:- append操作时用于创建数据流管道;
- 数据流管道写数据出现异常时,进行恢复操作。
恢复操作有三个目的:- 等待重启的Datanode启动;
- 将错误的Datanode从数据流管道中删除,将新的Datanode添加到数据流管道中;
- 最后更新Namenode命名空间中数据块 的时间戳,这样异常Datanode ±的过期数据块就可以被删除了;
setupPipelineForAppendOrRecovery()
方法的执行流程分为如下几步:- 处理重启的Datanode,这里的处理方式是线程睡眠一段时间(默认为4秒),等待 Datanode重启。如果睡眠时间过长,超过restartDeadline,那么将这个重启的Datanode 标志为错误节点。
2.处理异常的Datanode,处理方式是从数据流管道中移除异常的Datanodeo
-
如果满足了替换异常Datanode的条件,则调用addDatanode2ExistingPipeline()方法在 数据流管道中添加一个新的Datanodeo
-
调用 ClientProtocoLupdatcBlockForPipeline()更新数据块的时间戳,这样异常 Datanode 上的时间戳错误的过期数据块就可以被删除了。
-
如果数据流管道恢复成功,则更新Namenode命名空间中数据块的时间戳,同时更 新当前Client侧缓存的数据块信息的时间戳。
在满足什么样的条件下,会用新的Datanode替换数据流管道中异常的Datanode呢?
根据配置,当当前数据节点的数目小于所需要的副本数目除2,以及在appencVhflushed操作下, 当前数据节点的数目小于所需要的副本数目时,调用addDatanode2ExistingPipeline()
方法将新 的数据节点添加到当前的数据流管道中。
ResponseProcessor线程
ResponseProcessor线程的处理逻辑比较简单,它从数据流管道下游节点的输入流中读入响应消息。然后判断响应状态,如果下游数据节点执行写入数据包失败,则通过ack消息中的应答码记录错误节点(errorlndex ),并设置错误标志位(hashError )。最后会在
DataStreamer.run()方法中调用processDatanodeEroor()
理这个错误信息。如果下游节点写入数据包成功,则把当前数据包信息从ackQueue中移除。