Datanode 收发数据块流程
数据节点最重要的功能就是管理物理存储上的数据块,并与Namenode以及DFSClient 通信以执行读写数据块的操作。这里的读写操作涉及大量数据的传输,例如DFSClient将数 据块写入数据节点中、DFSClient从数据节点中读取数据块,以及数据节点将数据块复制到其 他数据节点等,这些操作都涉及大量的IO。
在Datanode的实现中,对这些读写操作提供了基于TCP流的数据访问接口 DataTransferProtocol:
public interface DataTransferProtocol {
Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class);
int DATA_TRANSFER_VERSION = 28;
void readBlock;
void writeBlock;
void transferBlock;
void requestShortCircuitFds;
void releaseShortCircuitFds;
void requestShortCircuitShm;
void replaceBlock;
void copyBlock;
void blockChecksum;
void blockGroupChecksum;
}
DataTransferProtocol 有两个子类 Sender 和 Receiver。其中 Sender 类封装了 DataTransferProtocol的调用操作,用于发起流式接口请求;Receiver类封装了 DataTransferProtocol的执行操作,用于响应流式接口请求。
假如 DFSClient 发起了一个 DataTransferProtocol.readBlock()操作,那么 DFSClient会调用Sender类将这个请求序列化,并传输给远程的Receiver。远程的Receiver 类接收到这个请求后,会反序列化请求,然后调用执行代码执行读取操作。
readBlock
这里我们先以 readBlock 为例:
sender:
send方法将Op对象以及序列化后的参数发送到IO流中。send()
方法会先往IO流中写入一个short长度的DataTransferProtocol版本号,然后再写入操作码Op, 最后写入序列化后的参数:
private static void send(final DataOutputStream out, final Op opcode,
final Message proto) throws IOException {
LOG.trace("Sending DataTransferOp {}: {}",
proto.getClass().getSimpleName(), proto);
op(out, opcode);
proto.writeDelimitedTo(out);
out.flush();
}
Op是一个枚举类型,使用一个byte类型的变量code标识 操作码。一个操作码对应DataTransferProtocol接口中的一个方法:
WRITE_BLOCK((byte)80),
READ_BLOCK((byte)81),
READ_METADATA((byte)82),
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
TRANSFER_BLOCK((byte)86),
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
BLOCK_GROUP_CHECKSUM((byte)90),
CUSTOM((byte)127);
Receiver类封装了 DataTransferProtocol 的执行操作,用于执行远程节点发起的流式接口请求。Receiver是一个抽象类,它提供了解析Sender请求操作码的readOp()
方法,以及处理 Sender请求的processOp()
方法。这两个方法都是在DataXceiver.run
中循环调用的。
protected final Op readOp() throws IOException {
final short version = in.readShort();
// 先从数据流中读入DataTransferProtocol版本号,并与当前版本号进行比对
if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch (Expected: " +
DataTransferProtocol.DATA_TRANSFER_VERSION +
", Received: " + version + " )");
}
return Op.read(in);
}
processOp()
方法接收readOp()
解析出的Op操作码作为参数,然后针对不同的操作码调用指定的方法:
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case BLOCK_GROUP_CHECKSUM:
opStripedBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
opReadBlock
方法首先从IO流中解析出序列化的Receiver.readBIock()
方法的参数,然后对解析出的参数进行反序列化,最后调用Receiver.readBlock()
方法执行读取操作。这里的 readBlock()方法是在 Receiver 的子类 DataXceiver 中实现的。
private void opReadBlock() throws IOException {
// 从IO流中读取序列化的readBlock ()参数
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
// 反序列化参数.然后调用子类DataXceiver的readBlock()才法执行读取操作
readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen(),
proto.getSendChecksums(),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
} finally {
if (traceScope != null) traceScope.close();
}
}
opReplaceBlock opWriteBlock 等方法的实现与opReadBlockO的实现类似。
DataXceiverServer
在DataNode的初始化代码中,会创建一个DataXceiverServer
对象监听所有流式接口请求, Datanode 会调用 Datanode.initDataXceivcr()
方法来完成 DataXceiverServer
对象的构造。initDataXceiver()
方法会首先创建TcpPeerServer对象(对ServerSocket的封装),它能通过 accept()
方法返回Peer对象(封装了 Socket对象,提供通信的输入/输出流)。
DataXceiverServer的功能都是在run()
方法:DataXceiverServer.run
方法的逻辑 非常简单,它会循环调用peerServer的accept
方法监听,如果有新的连接请求则创建Peer对象,并构造一个DataXceiver线程服务这个流式请求 也就是DataXceiverServer只负责连接的建立以及构造并启动DataXceiver,流式接口请求则是由DataXceiver响应的,真正的操作都是由DataXceiver来进行的。