Hadoophadoop大数据生态圈Hadoop系

hdfs读之socket通讯解析<三>

2019-08-22  本文已影响2人  古语1

一、通讯流程图

image.png

二、Sender客户端解析

1、RemoteBlockReader2.newBlockReader()解析

根据block信息向datanode发送请求读取block数据,DataXceiver接收客户端的请求并且作出响应。一个是响应客户端是否能读取datanode的状态值,二是读取block数据。该方法如果接收datanode响应状态值SUCCESS就成功创建了RemoteBlockReader2对象,否则datanode服务端就会关闭流。

   public static BlockReader newBlockReader(String file,
                                     ExtendedBlock block, //读取的block
                                     Token<BlockTokenIdentifier> blockToken, //token校验
                                     long startOffset, long len,
                                     boolean verifyChecksum,
                                     String clientName,
                                     Peer peer, DatanodeID datanodeID,  //请求的datanode
                                     PeerCache peerCache,
                                     CachingStrategy cachingStrategy) throws IOException {
    // in and out will be closed when sock is closed (by the caller)

    //peer : NioInetPeer(Socket[addr=/127.0.0.1,port=49621,localport=49629])
    //SocketOutputStream$Writer java.nio.channels.SocketChannel[connected local=/127.0.0.1:49629 remote=/127.0.0.1:49621]
    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
          peer.getOutputStream()));
    //到指定的datanode上读取数据块
    //client端向Datanode发送读取block请求,DataXceiver类(Receiver子类)获取请求
    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy);
    // Get bytes in block
    //SocketIutputStream$Reader java.nio.channels.SocketChannel[connected local=/127.0.0.1:49629 remote=/127.0.0.1:49621]
    //和datanoede建立一个输入流等待datanode端返回数据(状态值和数据,数据是在readNextPacket方法接受的)
    //等待DataXceiver结果返回(DataXceiver response BlockOpResponseProto)
    DataInputStream in = new DataInputStream(peer.getInputStream());
 
    //status: SUCCESS
    //readOpChecksumInfo {
    //  checksum {
    //    type: CHECKSUM_CRC32C
    //    bytesPerChecksum: 512
    //  }
    //  chunkOffset: 0
    //}
    //数据块校验成功
    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
        PBHelper.vintPrefixed(in));
    checkSuccess(status, peer, block, file);
    ReadOpChecksumInfoProto checksumInfo =
      status.getReadOpChecksumInfo();
    //获取checksum
    DataChecksum checksum = DataTransferProtoUtil.fromProto(
        checksumInfo.getChecksum());
    //Warning when we get CHECKSUM_NULL?

    // Read the first chunk offset.
    long firstChunkOffset = checksumInfo.getChunkOffset();

    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
      throw new IOException("BlockReader: error in first chunk offset (" +
                            firstChunkOffset + ") startOffset is " +
                            startOffset + " for file " + file);
    }

    return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
        checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
        datanodeID, peerCache);
  }
2、Sender.readBlock()解析

将所有请求参数序列化成OpReadBlockProto,操作请求是READ_BLOCK

  @Override
  public void readBlock(final ExtendedBlock blk,
      final Token<BlockTokenIdentifier> blockToken,
      final String clientName,
      final long blockOffset,
      final long length,
      final boolean sendChecksum,
      final CachingStrategy cachingStrategy) throws IOException {

    OpReadBlockProto proto = OpReadBlockProto.newBuilder()
            //setHeader DataTransferProtos.BaseHeaderProto
      .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
      .setOffset(blockOffset)
      .setLen(length)
      .setSendChecksums(sendChecksum)
      .setCachingStrategy(getCachingStrategy(cachingStrategy))
      .build();

    send(out, Op.READ_BLOCK, proto);
  }
  
3、Sender.send()解析

写入版本号,通过outputstream请求datanode服务端,至此客户端读取数据请求结束。

private static void send(final DataOutputStream out, final Op opcode,
      final Message proto) throws IOException {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
          + ": " + proto);
    }
    //写入版本号DATA_TRANSFER_VERSION
    op(out, opcode);
    proto.writeDelimitedTo(out);
    out.flush();
  }

三、DataXceiver服务端解析

1、DataXceiverServer.run()线程解析

DataXceiverServer线程不停的监听客户端请求datanode数据,该线程接收到请求后会创建DataXceiver来处理客户端请求,并响应处理结果。

  @Override
  public void run() {
    Peer peer = null;
    while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
      try {
        peer = peerServer.accept();

        // Make sure the xceiver count is not exceeded
        int curXceiverCount = datanode.getXceiverCount();
        if (curXceiverCount > maxXceiverCount) {
          throw new IOException("Xceiver count " + curXceiverCount
              + " exceeds the limit of concurrent xcievers: "
              + maxXceiverCount);
        }
        //创建一个守护线程并处理请求
        new Daemon(datanode.threadGroup,
            DataXceiver.create(peer, datanode, this))
            .start();
      } catch (SocketTimeoutException ignored) {
        // wake up to see if should continue to run
      } catch (AsynchronousCloseException ace) {
        // another thread closed our listener socket - that's expected during shutdown,
        // but not in other circumstances
        if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
          LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
        }
      } catch (IOException ie) {
        IOUtils.cleanup(null, peer);
        LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
      } catch (OutOfMemoryError ie) {
        IOUtils.cleanup(null, peer);
        // DataNode can run out of memory if there is too many transfers.
        // Log the event, Sleep for 30 seconds, other transfers may complete by
        // then.
        LOG.error("DataNode is out of memory. Will retry in 30 seconds.", ie);
        try {
          Thread.sleep(30 * 1000);
        } catch (InterruptedException e) {
          // ignore
        }
      } catch (Throwable te) {
        LOG.error(datanode.getDisplayName()
            + ":DataXceiverServer: Exiting due to: ", te);
        datanode.shouldRun = false;
      }
    }

    // Close the server to stop reception of more requests.
    try {
      peerServer.close();
      closed = true;
    } catch (IOException ie) {
      LOG.warn(datanode.getDisplayName()
          + " :DataXceiverServer: close exception", ie);
    }

    // if in restart prep stage, notify peers before closing them.
    if (datanode.shutdownForUpgrade) {
      restartNotifyPeers();
      // Each thread needs some time to process it. If a thread needs
      // to send an OOB message to the client, but blocked on network for
      // long time, we need to force its termination.
      LOG.info("Shutting down DataXceiverServer before restart");
      // Allow roughly up to 2 seconds.
      for (int i = 0; getNumPeers() > 0 && i < 10; i++) {
        try {
          Thread.sleep(200);
        } catch (InterruptedException e) {
          // ignore
        }
      }
    }
    // Close all peers.
    closeAllPeers();
  }
2、XceiverServer.run()线程解析

readOp方法解析客户端请求操作类型,本次请求的是READ_BLOCK读取操作。
processOp方法处理客户端操作,本次是读取block请求。

/**
   * Read/write data from/to the DataXceiverServer.
   */
  @Override
  public void run() {
    int opsProcessed = 0;
    Op op = null;

    try {
      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
      peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
      InputStream input = socketIn;
      try {
        IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
          socketIn, datanode.getXferAddress().getPort(),
          datanode.getDatanodeId());
        input = new BufferedInputStream(saslStreams.in,
          HdfsConstants.SMALL_BUFFER_SIZE);
        socketOut = saslStreams.out;
      } catch (InvalidMagicNumberException imne) {
        if (imne.isHandshake4Encryption()) {
          LOG.info("Failed to read expected encryption handshake from client " +
              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
              "is running an older version of Hadoop which does not support " +
              "encryption");
        } else {
          LOG.info("Failed to read expected SASL data transfer protection " +
              "handshake from client at " + peer.getRemoteAddressString() + 
              ". Perhaps the client is running an older version of Hadoop " +
              "which does not support SASL data transfer protection");
        }
        return;
      }
      
      super.initialize(new DataInputStream(input));
      
      // We process requests in a loop, and stay around for a short timeout.
      // This optimistic behaviour allows the other end to reuse connections.
      // Setting keepalive timeout to 0 disable this behavior.
      do {
        updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
        try {
          if (opsProcessed != 0) {
            assert dnConf.socketKeepaliveTimeout > 0;
            peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
          } else {
            peer.setReadTimeout(dnConf.socketTimeout);
          }
          //读取客户端操作
          op = readOp();
        } catch (InterruptedIOException ignored) {
          // Time out while we wait for client rpc
          break;
        } catch (IOException err) {
          // Since we optimistically expect the next op, it's quite normal to get EOF here.
          if (opsProcessed > 0 &&
              (err instanceof EOFException || err instanceof ClosedChannelException)) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
            }
          } else {
            incrDatanodeNetworkErrors();
            throw err;
          }
          break;
        }
        // restore normal timeout
        if (opsProcessed != 0) {
          peer.setReadTimeout(dnConf.socketTimeout);
        }
        opStartTime = monotonicNow();
        //处理客户端操作请求
        processOp(op);
        ++opsProcessed;
      } while ((peer != null) &&
          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
    } catch (Throwable t) {
      String s = datanode.getDisplayName() + ":DataXceiver error processing "
          + ((op == null) ? "unknown" : op.name()) + " operation "
          + " src: " + remoteAddress + " dst: " + localAddress;
      if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {
        // For WRITE_BLOCK, it is okay if the replica already exists since
        // client and replication may write the same block to the same datanode
        // at the same time.
        if (LOG.isTraceEnabled()) {
          LOG.trace(s, t);
        } else {
          LOG.info(s + "; " + t);
        }
      } else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) {
        String s1 =
            "Likely the client has stopped reading, disconnecting it";
        s1 += " (" + s + ")";
        if (LOG.isTraceEnabled()) {
          LOG.trace(s1, t);
        } else {
          LOG.info(s1 + "; " + t);          
        }
      } else {
        LOG.error(s, t);
      }
    } finally {
      if (LOG.isDebugEnabled()) {
        LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
            + datanode.getXceiverCount());
      }
      updateCurrentThreadName("Cleaning up");
      if (peer != null) {
        dataXceiverServer.closePeer(peer);
        IOUtils.closeStream(in);
      }
    }
  }
3、Receiver.processOp()解析

本次调用opReadBlock方法

/** Process op by the corresponding method. */
  protected final void processOp(Op op) throws IOException {
    switch(op) {
    case READ_BLOCK:
      //读取block
      opReadBlock();
      break;
    case WRITE_BLOCK:
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:
      opBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }

datanode处理readBlock操作

  /** Receive OP_READ_BLOCK */
  private void opReadBlock() throws IOException {
    OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
        proto.getClass().getSimpleName());
    try {
//真正处理客户端请求      readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
        PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
        proto.getHeader().getClientName(),
        proto.getOffset(),
        proto.getLen(),
        proto.getSendChecksums(),
        (proto.hasCachingStrategy() ?
            getCachingStrategy(proto.getCachingStrategy()) :
          CachingStrategy.newDefaultStrategy()));
    } finally {
      if (traceScope != null) traceScope.close();
    }
  }
  
4、XceiverServer.readBlock()解析

该方法获取客户端out流,然后校验读取的block是否有权限。
其中writeSuccessWithChecksumInfo方法是通知客户端请求已经成功接受,客户端状态正常可以读取并且通知客户端当前datanode节点的校验信息,通过BlockOpResponseProto序列化响应给客户端。
blockSender是真正读取datanode的数据给客户端。

  @Override
  public void readBlock(final ExtendedBlock block,
      final Token<BlockTokenIdentifier> blockToken,
      final String clientName,
      final long blockOffset,
      final long length,
      final boolean sendChecksum,
      final CachingStrategy cachingStrategy) throws IOException {
    previousOpClientName = clientName;
    long read = 0;
    //获取datanode连接客户端的IO流
    OutputStream baseStream = getOutputStream();
    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
        baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
    //校验是否有权限读取,否则通过BlockOpResponseProto告诉客户端失败并终止输出流
    checkAccess(out, true, block, blockToken,
        Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
  
    // send the block
    BlockSender blockSender = null;
    DatanodeRegistration dnR = 
      datanode.getDNRegistrationForBP(block.getBlockPoolId());
    final String clientTraceFmt =
      clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
        ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
            "%d", "HDFS_READ", clientName, "%d",
            dnR.getDatanodeUuid(), block, "%d")
        : dnR + " Served block " + block + " to " +
            remoteAddress;

    updateCurrentThreadName("Sending block " + block);
    try {
      try {
        //
        blockSender = new BlockSender(block, blockOffset, length,
            true, false, sendChecksum, datanode, clientTraceFmt,
            cachingStrategy);
      } catch(IOException e) {
        String msg = "opReadBlock " + block + " received exception " + e; 
        LOG.info(msg);
        sendResponse(ERROR, msg);
        throw e;
      }
      
      // send op status
      //BlockOpResponseProto response Receiver,通知客户端请求已经成功接受,客户端状态正常可以读取并且通知客户端当前datanode节点的校验信息
      writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));

      long beginRead = Time.monotonicNow();
      //响应给客户端读取数据块
      read = blockSender.sendBlock(out, baseStream, null); // send data
      long duration = Time.monotonicNow() - beginRead;
      if (blockSender.didSendEntireByteRange()) {
        // If we sent the entire range, then we should expect the client
        // to respond with a Status enum.
        try {
          ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
              PBHelper.vintPrefixed(in));
          if (!stat.hasStatus()) {
            LOG.warn("Client " + peer.getRemoteAddressString() +
                " did not send a valid status code after reading. " +
                "Will close connection.");
            IOUtils.closeStream(out);
          }
        } catch (IOException ioe) {
          LOG.debug("Error reading client status response. Will close connection.", ioe);
          IOUtils.closeStream(out);
          incrDatanodeNetworkErrors();
        }
      } else {
        IOUtils.closeStream(out);
      }
      datanode.metrics.incrBytesRead((int) read);
      datanode.metrics.incrBlocksRead();
      datanode.metrics.incrTotalReadTime(duration);
    } catch ( SocketException ignored ) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " +
            remoteAddress, ignored);
      }
      // Its ok for remote side to close the connection anytime.
      datanode.metrics.incrBlocksRead();
      //socket异常就关闭流
      IOUtils.closeStream(out);
    } catch ( IOException ioe ) {
      /* What exactly should we do here?
       * Earlier version shutdown() datanode if there is disk error.
       */
      if (!(ioe instanceof SocketTimeoutException)) {
        LOG.warn(dnR + ":Got exception while serving " + block + " to "
          + remoteAddress, ioe);
        incrDatanodeNetworkErrors();
      }
      throw ioe;
    } finally {
      //异常就关闭blockSender
      IOUtils.closeStream(blockSender);
    }

    //update metrics
    datanode.metrics.addReadBlockOp(elapsed());
    datanode.metrics.incrReadsFromClient(peer.isLocal(), read);
  }

5、BlockSender.readBlock()解析

由doSendBlock最终处理读取数据

  /**
   * sendBlock() is used to read block and its metadata and stream the data to
   * either a client or to another datanode. 
   * 
   * @param out  stream to which the block is written to
   * @param baseStream optional. if non-null, <code>out</code> is assumed to 
   *        be a wrapper over this stream. This enables optimizations for
   *        sending the data, e.g. 
   *        {@link SocketOutputStream#transferToFully(FileChannel, 
   *        long, int)}.
   * @param throttler for sending data.
   * @return total bytes read, including checksum data.
   */
  long sendBlock(DataOutputStream out, OutputStream baseStream, 
                 DataTransferThrottler throttler) throws IOException {
    TraceScope scope =
        Trace.startSpan("sendBlock_" + block.getBlockId(), Sampler.NEVER);
    try {
      return doSendBlock(out, baseStream, throttler);
    } finally {
      scope.close();
    }
  }
6、BlockSender.doSendBlock()解析

读取数据之前会调用manageOsCache来执行预读取操作提高读取性能,就是将数据提前加载到缓存中,该方法清理不在使用的缓存数据。并且构建存放packet的缓冲区。

private long doSendBlock(DataOutputStream out, OutputStream baseStream,
        DataTransferThrottler throttler) throws IOException {
    if (out == null) {
      throw new IOException( "out stream is null" );
    }
    initialOffset = offset;
    long totalRead = 0;
    OutputStream streamForSendChunks = out;
    
    lastCacheDropOffset = initialOffset;

    if (isLongRead() && blockInFd != null) {
      // Advise that this file descriptor will be accessed sequentially.
      NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
          block.getBlockName(), blockInFd, 0, 0,
          NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
    }
    
    // Trigger readahead of beginning of file if configured.
    manageOsCache();

    final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
    try {
      int maxChunksPerPacket;
      int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
      boolean transferTo = transferToAllowed && !verifyChecksum
          && baseStream instanceof SocketOutputStream
          && blockIn instanceof FileInputStream;
      if (transferTo) {
        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
        blockInPosition = fileChannel.position();
        streamForSendChunks = baseStream;
        maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
        
        // Smaller packet size to only hold checksum when doing transferTo
        pktBufSize += checksumSize * maxChunksPerPacket;
      } else {
        maxChunksPerPacket = Math.max(1,
            numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
        // Packet size includes both checksum and data
        pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
      }

      ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);

      while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
        manageOsCache();
        long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
            transferTo, throttler);
        offset += len;
        totalRead += len + (numberOfChunks(len) * checksumSize);
        seqno++;
      }
      // If this thread was interrupted, then it did not send the full block.
      if (!Thread.currentThread().isInterrupted()) {
        try {
          // send an empty packet to mark the end of the block
          sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
              throttler);
          out.flush();
        } catch (IOException e) { //socket error
          throw ioeToSocketException(e);
        }

        sentEntireByteRange = true;
      }
    } finally {
      if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
        final long endTime = System.nanoTime();
        ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
            initialOffset, endTime - startTime));
      }
      //异常关闭流
      close();
    }
    return totalRead;
  }
7、BlockSender.sendPacket()解析

发送数据包packet,数据是通过transferTo方式写入到IO中,这种方式属于零copy,可以提高读写性能。并且控制发送的速度。

  /**
   * Sends a packet with up to maxChunks chunks of data.
   * 
   * @param pkt buffer used for writing packet data
   * @param maxChunks maximum number of chunks to send
   * @param out stream to send data to
   * @param transferTo use transferTo to send data
   * @param throttler used for throttling data transfer bandwidth
   */
  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
      boolean transferTo, DataTransferThrottler throttler) throws IOException {
    int dataLen = (int) Math.min(endOffset - offset,
                             (chunkSize * (long) maxChunks));
    
    int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
    int checksumDataLen = numChunks * checksumSize;
    int packetLen = dataLen + checksumDataLen + 4;
    boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;

    // The packet buffer is organized as follows:
    // _______HHHHCCCCD?D?D?D?
    //        ^   ^
    //        |   \ checksumOff
    //        \ headerOff
    // _ padding, since the header is variable-length
    // H = header and length prefixes
    // C = checksums
    // D? = data, if transferTo is false.
    
    int headerLen = writePacketHeader(pkt, dataLen, packetLen);
    
    // Per above, the header doesn't start at the beginning of the
    // buffer
    int headerOff = pkt.position() - headerLen;
    
    int checksumOff = pkt.position();
    byte[] buf = pkt.array();
    
    if (checksumSize > 0 && checksumIn != null) {
      readChecksum(buf, checksumOff, checksumDataLen);

      // write in progress that we need to use to get last checksum
      if (lastDataPacket && lastChunkChecksum != null) {
        int start = checksumOff + checksumDataLen - checksumSize;
        byte[] updatedChecksum = lastChunkChecksum.getChecksum();
        
        if (updatedChecksum != null) {
          System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
        }
      }
    }
    
    int dataOff = checksumOff + checksumDataLen;
    if (!transferTo) { // normal transfer
      IOUtils.readFully(blockIn, buf, dataOff, dataLen);

      if (verifyChecksum) {
        verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
      }
    }
    
    try {
      if (transferTo) {
        SocketOutputStream sockOut = (SocketOutputStream)out;
        // First write header and checksums
        sockOut.write(buf, headerOff, dataOff - headerOff);
        
        // no need to flush since we know out is not a buffered stream
        FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
        LongWritable waitTime = new LongWritable();
        LongWritable transferTime = new LongWritable();
        sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
            waitTime, transferTime);
        datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
        datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
        blockInPosition += dataLen;
      } else {
        // normal transfer
        out.write(buf, headerOff, dataOff + dataLen - headerOff);
      }
    } catch (IOException e) {
      if (e instanceof SocketTimeoutException) {
        /*
         * writing to client timed out.  This happens if the client reads
         * part of a block and then decides not to read the rest (but leaves
         * the socket open).
         * 
         * Reporting of this case is done in DataXceiver#run
         */
      } else {
        /* Exception while writing to the client. Connection closure from
         * the other end is mostly the case and we do not care much about
         * it. But other things can go wrong, especially in transferTo(),
         * which we do not want to ignore.
         *
         * The message parsing below should not be considered as a good
         * coding example. NEVER do it to drive a program logic. NEVER.
         * It was done here because the NIO throws an IOException for EPIPE.
         */
        String ioem = e.getMessage();
        if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
          LOG.error("BlockSender.sendChunks() exception: ", e);
        }
        datanode.getBlockScanner().markSuspectBlock(
              volumeRef.getVolume().getStorageID(),
              block);
      }
      throw ioeToSocketException(e);
    }

    if (throttler != null) { // rebalancing so throttle
//节流阀控制发送速度
      throttler.throttle(packetLen);
    }

    return dataLen;
  }
上一篇下一篇

猜你喜欢

热点阅读