Hadoop源码分析-HDFS写数据之创建packet

2021-01-21  本文已影响0人  晨磊的微博

[TOC]
契约机制深度剖析那里,我们提到过两个类,DFSOutputStreamDataStreamer 这两个类是写数据的核心类。我们先看看注释:

1. DFSOutputStream注释

 * DFSOutputStream creates files from a stream of bytes.
 *
 * The client application writes data that is cached internally by
 * this stream. Data is broken up into packets, each packet is
 * typically 64K in size. A packet comprises of chunks. Each chunk
 * is typically 512 bytes and has an associated checksum with it.
 *
 * When a client application fills up the currentPacket, it is
 * enqueued into dataQueue.  The DataStreamer thread picks up
 * packets from the dataQueue, sends it to the first datanode in
 * the pipeline and moves it from the dataQueue to the ackQueue.
 * The ResponseProcessor receives acks from the datanodes. When an
 * successful ack for a packet is received from all datanodes, the
 * ResponseProcessor removes the corresponding packet from the
 * ackQueue.
 *
 * In case of error, all outstanding packets and moved from
 * ackQueue. A new pipeline is setup by eliminating the bad
 * datanode from the original pipeline. The DataStreamer now
 * starts sending packets from the dataQueue.

2. DataStreamer 类注释

  // The DataStreamer class is responsible for sending data packets to the
  // datanodes in the pipeline. It retrieves a new blockid and block locations
  // from the namenode, and starts streaming packets to the pipeline of
  // Datanodes. Every packet has a sequence number associated with
  // it. When all the packets for a block are sent out and acks for each
  // if them are received, the DataStreamer closes the current block.

3. 创建packet

根据DFSOutputStreamDataStream的注释,我们知道这两个类是HDFS写数据的关键。那么他们在哪里被使用呢?我们直接跟进fos.write("abc".getBytes());看看。

  1. 顺着fos.write("abc".getBytes());一直跟进,就跟到了OutputStream.write(int b)方法。并且这个还是个抽象方法。一路跟下来并没有发现调用HDFS的什么,那这样跟肯定是不对的。在最初的地方肯定是返回的并不是实际的对象。我们要去找到真实的对象。

    image
    image
    image
    image
  2. 我们在通过fileSystem.create方法去寻找真实返回的对象类型。当然我们之前讲过fileSystem.create实际上是调用了DistributedFileSystem.create。我们直接看DistributedFileSystem.create就可以了。这里最后就是返回的return dfs.createWrappedOutputStream(dfsos, statistics);,我们接着跟进去看看。

    image
  3. 下面都是一行代码,直接过了。


    image
    image
  4. 可以看到,这里最后返回的其实是HdfsDataOutputStream类。那么我们就直接看HdfsDataOutputStream.write()方法吧。

    image
  5. 结果在 HdfsDataOutputStream里没有找到,那么我们到他的父类里去看看。

    image
  6. HdfsDataOutputStream的父类FSDataOutputStream里,还真找到了。里面调用了out.write(b);,跟进去结果发现又到了OutputStream.write()。那么我们就想下,是不是out.write()out被赋值为其他子类了呢?

    image
  7. 仔细看看代码,发现原来这个out变量是由构造函数赋值的,并且还是调用了父类的构造函数赋值。那我们看看在构建 HdfsDataOutputStream时传入的实际对象是哪个?

    image
  8. 可以看到其实是DFSOutputStream。那么我们看看DFSOutputStream.write()方法吧。

    image
  9. 结果又是没有找到,那么我们再看看他的父类FSOutputStream

    image
  10. 终于找到了,这里调用了flushBuffer();。我们在跟进下。

    image
  11. FSOutputSummer.flushBuffer(),这里除了writeChecksumChunks(buf, 0, lenToFlush);之外,其他都是定义变量,或者判断之类的。

    image
    image
  12. FSOutputSummer.writeChecksumChunks()1:这里是计算校验和;2:按照chunk的大小来遍历字节;3:把每个chunk发送出去;FSOutputSummer.writeChunk()这个方法是个抽象方法,需要看他的实现DFSOutputStream.writeChunk()

    image
  13. DFSOutputStream.writeChunk(),代码writeChunkImpl()就是写Chunk的实现。跟进。

    image
  14. DFSOutputStream.writeChunkImpl(),这里代码比较多,分开看看
    14.1 这块没什么可说的, 都是写检查

    image
    14.2 这里是把 chunk 写入 packet,有校验和,数据,和计数
    image
    14.3 1:这里就是判断是不是写够一个packet了;2:这里就是个debug日志;3:这里这开始写数据了;
    image
  15. DFSOutputStream.waitAndQueueCurrentPacket().
    15.1 这里没什么可看的。就是有个while循环。就是当dataQueue+ackQueue超过配置的大小时,就进行等待。

    image
    15.2 DFSOutputStream.queueCurrentPacket(),这个才是我们要找的代码。
    image
  16. DFSOutputStream.queueCurrentPacket(),这里就是把packet添加到dataQueue队列了,后面还有个notifyAll(),因为前面判断如何dataQueue满了,会wait。

    image

以上就是创建packet并把packet写入 dataQueue 的过程了。这都发生在客户端。以下是个简单的总结

image
上一篇 下一篇

猜你喜欢

热点阅读