hadoop大数据生态圈HadoopHadoop系

hdfs写之写数据<二>

2019-08-27  本文已影响0人  古语1

一、写数据流程图

该流程主要是客户端开始写数据,然后把数据切分多个chunk,多个chunk组成一个packet,发送到dequeue中,等待datanode写数据。

image.png

二、写数据流程

1、FSDataOutputStream.writer写方法

调用父类的FSOutputSummer.write方法

    @Override
    public void write(byte b[], int off, int len) throws IOException {
      //2。 DFSOutputStream extends FSOutputSummer.write
      out.write(b, off, len);
      position += len;                            // update position
      if (statistics != null) {
        statistics.incrementBytesWritten(len);
      }
    }

该方法检查客户端端状态是否正常

 /**
   * Writes <code>len</code> bytes from the specified byte array 
   * starting at offset <code>off</code> and generate a checksum for
   * each data chunk.
   *
   * <p> This method stores bytes from the given array into this
   * stream's buffer before it gets checksumed. The buffer gets checksumed 
   * and flushed to the underlying output stream when all data 
   * in a checksum chunk are in the buffer.  If the buffer is empty and
   * requested length is at least as large as the size of next checksum chunk
   * size, this method will checksum and write the chunk directly 
   * to the underlying output stream.  Thus it avoids uneccessary data copy.
   *
   * @param      b     the data.
   * @param      off   the start offset in the data.
   * @param      len   the number of bytes to write.
   * @exception  IOException  if an I/O error occurs.
   */
  @Override
  public synchronized void write(byte b[], int off, int len)
      throws IOException {
    //3
    checkClosed();
    
    if (off < 0 || len < 0 || off > b.length - len) {
      throw new ArrayIndexOutOfBoundsException();
    }
    //len is block size
    for (int n=0; n<len; n += write1(b, off+n, len-n)) {
    }
  }

该方法最终调用writeChecksumChunks,包括flushBuffer也是调用writeChecksumChunks。写chunk数据到packet中。

/**
   * Write a portion of an array, flushing to the underlying
   * stream at most once if necessary.
   */
  private int write1(byte b[], int off, int len) throws IOException {
    //buf: internal buffer for storing data before it is checksumed
    //如果buffer为空并且写入数据大于buffer长度(一个校验块chunk大小),直接将数据与校验写入IO中
    if(count==0 && len>=buf.length) {
      // local buffer is empty and user buffer size >= local buffer size, so
      // simply checksum the user buffer and send it directly to the underlying
      // stream
      final int length = buf.length;//4608=512*9=sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS
      //每次正好写一个校验块chunk大小
      writeChecksumChunks(b, off, length);
      return length;
    }
    // 当数据小于本地数据库chunk时候,先写入buf,当buf写满之后,flushBuffer也执行writeChecksumChunks
    // copy user data to local buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {
      // local buffer is full
      flushBuffer();
    } 
    return bytesToCopy;
  }

根据写的数据大小,切分多个chunk分别由writeChunk写。

  /** Generate checksums for the given data chunks and output chunks & checksums
   * to the underlying output stream.
   */
  private void writeChecksumChunks(byte b[], int off, int len)
  throws IOException {
    //len=4068
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    //每次正好写一个校验块chunk大小,len=4608,getBytesPerChecksum=512
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      //chunkLen=512,和blocksize有什么数学计算关系?
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
    }
  }

调用writeChunkImpl处理

  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    TraceScope scope =
        dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
    try {
      //len=512
      writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
    } finally {
      scope.close();
    }
  }

该方法主要将数据和校验和写入packet中,如果packet写满了chunk或者达到blocksize就会将整个packet发送给dequeue队列中,等待线程DataStreamer 发送,最后发生一个空的packet告诉DataStreamer已经发送完成一个整的packet。

  private synchronized void writeChunkImpl(byte[] b, int offset, int len,
          byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    //      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
    // chunkLen = len
    //写的数据不能大于校验块chunk大小,len=512,bytesPerChecksum=512
    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    //实际写的校验和大小和给的值不一致
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }
    //当前写入的packet包为空则重新创建
    if (currentPacket == null) {
      //DFSPacket maxChunks=chunksPerPacket,第一次chunksPerPacket=126,后续就=1
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++, false);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.getSeqno() +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }
    //写入校验和
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    //写实际数据
    currentPacket.writeData(b, offset, len);
    //chunk个数加一
    currentPacket.incNumChunks();
    //当前block size累加len(512)
    bytesCurBlock += len;
    if (currentPacket.getNumChunks() == 126) {
      System.out.println("========");
    }
    // If packet is full, enqueue it for transmission
    //如果校验块或者写满了block size 则将packet放到queue中。
    // 会不会有不等于的情况发生?由于incNumChunks是加一操作,所以肯定会有currentPacket.getNumChunks() == currentPacket.getMaxChunks()
    //blockSize=65536
    //如果当前bytesCurBlock大小大于默认的blockSize怎么办?这种情况好像出现不了
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.getSeqno() +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      //将数据currentPacket放到队列dataqueue中,等待线程DataStreamer 发送
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      //默认appendChunk false,如果chunk没有写满则appendChunk=true,见DataStreamer构造方法
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumBufSize();
      }

      if (!appendChunk) {
        //writePacketSize=dfs.client-write-packet-size默认65536
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        System.out.println("psize="+psize);
        //将chunksPerPacket重新赋值,本次计算为1
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //当getNumChunks() == currentPacket.getMaxChunks时候bytesCurBlock=64512
      //dataQueue:
      //0 = {DFSPacket@7262} "packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512"
      //1 = {DFSPacket@7263} "packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65024"
      //2 = {DFSPacket@7188} "packet seqno: 2 offsetInBlock: 65024 lastPacketInBlock: false lastByteOffsetInBlock: 65536"
      //当block正好写满了,发送一个空packet
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
        currentPacket.setSyncBlock(shouldSyncBlock);
        waitAndQueueCurrentPacket();
        //重新赋值
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }
上一篇下一篇

猜你喜欢

热点阅读