Datanode 收发数据块流程

2022-12-19  本文已影响0人  wayyyy

数据节点最重要的功能就是管理物理存储上的数据块,并与Namenode以及DFSClient 通信以执行读写数据块的操作。这里的读写操作涉及大量数据的传输,例如DFSClient将数 据块写入数据节点中、DFSClient从数据节点中读取数据块,以及数据节点将数据块复制到其 他数据节点等,这些操作都涉及大量的IO。

在Datanode的实现中,对这些读写操作提供了基于TCP流的数据访问接口 DataTransferProtocol:

public interface DataTransferProtocol {
    Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class);

    int DATA_TRANSFER_VERSION = 28;

    void readBlock;
    void writeBlock;
    void transferBlock;
    void requestShortCircuitFds;
    void releaseShortCircuitFds;
    void requestShortCircuitShm;
    void replaceBlock;
    void copyBlock;
    void blockChecksum;
    void blockGroupChecksum;
}

DataTransferProtocol 有两个子类 Sender 和 Receiver。其中 Sender 类封装了 DataTransferProtocol的调用操作,用于发起流式接口请求;Receiver类封装了 DataTransferProtocol的执行操作,用于响应流式接口请求。

假如 DFSClient 发起了一个 DataTransferProtocol.readBlock()操作,那么 DFSClient会调用Sender类将这个请求序列化,并传输给远程的Receiver。远程的Receiver 类接收到这个请求后,会反序列化请求,然后调用执行代码执行读取操作。

readBlock

这里我们先以 readBlock 为例:

sender:

send方法将Op对象以及序列化后的参数发送到IO流中。send()方法会先往IO流中写入一个short长度的DataTransferProtocol版本号,然后再写入操作码Op, 最后写入序列化后的参数:

 private static void send(final DataOutputStream out, final Op opcode,
      final Message proto) throws IOException {
    LOG.trace("Sending DataTransferOp {}: {}",
        proto.getClass().getSimpleName(), proto);
    op(out, opcode);
    proto.writeDelimitedTo(out);
    out.flush();
  }

Op是一个枚举类型,使用一个byte类型的变量code标识 操作码。一个操作码对应DataTransferProtocol接口中的一个方法:

WRITE_BLOCK((byte)80),
READ_BLOCK((byte)81),
READ_METADATA((byte)82),
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
TRANSFER_BLOCK((byte)86),
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
BLOCK_GROUP_CHECKSUM((byte)90),
CUSTOM((byte)127);

Receiver类封装了 DataTransferProtocol 的执行操作,用于执行远程节点发起的流式接口请求。Receiver是一个抽象类,它提供了解析Sender请求操作码的readOp()方法,以及处理 Sender请求的processOp()方法。这两个方法都是在DataXceiver.run中循环调用的。

protected final Op readOp() throws IOException {
    final short version = in.readShort();
    // 先从数据流中读入DataTransferProtocol版本号,并与当前版本号进行比对
    if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
      throw new IOException( "Version Mismatch (Expected: " +
          DataTransferProtocol.DATA_TRANSFER_VERSION  +
          ", Received: " +  version + " )");
    }
    return Op.read(in);
}

processOp()方法接收readOp()解析出的Op操作码作为参数,然后针对不同的操作码调用指定的方法:

protected final void processOp(Op op) throws IOException {
    switch(op) {
    case READ_BLOCK:
      opReadBlock();
      break;
    case WRITE_BLOCK:
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:
      opBlockChecksum(in);
      break;
    case BLOCK_GROUP_CHECKSUM:
      opStripedBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
}

opReadBlock方法首先从IO流中解析出序列化的Receiver.readBIock()方法的参数,然后对解析出的参数进行反序列化,最后调用Receiver.readBlock()方法执行读取操作。这里的 readBlock()方法是在 Receiver 的子类 DataXceiver 中实现的。

private void opReadBlock() throws IOException {
    // 从IO流中读取序列化的readBlock ()参数
    OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
    TraceScope traceScope = continueTraceSpan(proto.getHeader(),
        proto.getClass().getSimpleName());
    try {
      // 反序列化参数.然后调用子类DataXceiver的readBlock()才法执行读取操作
      readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
        PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
        proto.getHeader().getClientName(),
        proto.getOffset(),
        proto.getLen(),
        proto.getSendChecksums(),
        (proto.hasCachingStrategy() ?
            getCachingStrategy(proto.getCachingStrategy()) :
          CachingStrategy.newDefaultStrategy()));
    } finally {
      if (traceScope != null) traceScope.close();
    }
  }

opReplaceBlock opWriteBlock 等方法的实现与opReadBlockO的实现类似。

DataXceiverServer

在DataNode的初始化代码中,会创建一个DataXceiverServer对象监听所有流式接口请求, Datanode 会调用 Datanode.initDataXceivcr()方法来完成 DataXceiverServer 对象的构造。initDataXceiver()方法会首先创建TcpPeerServer对象(对ServerSocket的封装),它能通过 accept()方法返回Peer对象(封装了 Socket对象,提供通信的输入/输出流)。

DataXceiverServer的功能都是在run()方法:DataXceiverServer.run方法的逻辑 非常简单,它会循环调用peerServer的accept方法监听,如果有新的连接请求则创建Peer对象,并构造一个DataXceiver线程服务这个流式请求 也就是DataXceiverServer只负责连接的建立以及构造并启动DataXceiver,流式接口请求则是由DataXceiver响应的,真正的操作都是由DataXceiver来进行的。

上一篇 下一篇

猜你喜欢

热点阅读