hdfs读之socket通讯解析<三>
一、通讯流程图
image.png二、Sender客户端解析
1、RemoteBlockReader2.newBlockReader()解析
根据block信息向datanode发送请求读取block数据,DataXceiver接收客户端的请求并且作出响应。一个是响应客户端是否能读取datanode的状态值,二是读取block数据。该方法如果接收datanode响应状态值SUCCESS就成功创建了RemoteBlockReader2对象,否则datanode服务端就会关闭流。
public static BlockReader newBlockReader(String file,
ExtendedBlock block, //读取的block
Token<BlockTokenIdentifier> blockToken, //token校验
long startOffset, long len,
boolean verifyChecksum,
String clientName,
Peer peer, DatanodeID datanodeID, //请求的datanode
PeerCache peerCache,
CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller)
//peer : NioInetPeer(Socket[addr=/127.0.0.1,port=49621,localport=49629])
//SocketOutputStream$Writer java.nio.channels.SocketChannel[connected local=/127.0.0.1:49629 remote=/127.0.0.1:49621]
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
//到指定的datanode上读取数据块
//client端向Datanode发送读取block请求,DataXceiver类(Receiver子类)获取请求
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy);
// Get bytes in block
//SocketIutputStream$Reader java.nio.channels.SocketChannel[connected local=/127.0.0.1:49629 remote=/127.0.0.1:49621]
//和datanoede建立一个输入流等待datanode端返回数据(状态值和数据,数据是在readNextPacket方法接受的)
//等待DataXceiver结果返回(DataXceiver response BlockOpResponseProto)
DataInputStream in = new DataInputStream(peer.getInputStream());
//status: SUCCESS
//readOpChecksumInfo {
// checksum {
// type: CHECKSUM_CRC32C
// bytesPerChecksum: 512
// }
// chunkOffset: 0
//}
//数据块校验成功
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
checkSuccess(status, peer, block, file);
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
//获取checksum
DataChecksum checksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
//Warning when we get CHECKSUM_NULL?
// Read the first chunk offset.
long firstChunkOffset = checksumInfo.getChunkOffset();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
throw new IOException("BlockReader: error in first chunk offset (" +
firstChunkOffset + ") startOffset is " +
startOffset + " for file " + file);
}
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
datanodeID, peerCache);
}
2、Sender.readBlock()解析
将所有请求参数序列化成OpReadBlockProto,操作请求是READ_BLOCK
@Override
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
//setHeader DataTransferProtos.BaseHeaderProto
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.build();
send(out, Op.READ_BLOCK, proto);
}
3、Sender.send()解析
写入版本号,通过outputstream请求datanode服务端,至此客户端读取数据请求结束。
private static void send(final DataOutputStream out, final Op opcode,
final Message proto) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
+ ": " + proto);
}
//写入版本号DATA_TRANSFER_VERSION
op(out, opcode);
proto.writeDelimitedTo(out);
out.flush();
}
三、DataXceiver服务端解析
1、DataXceiverServer.run()线程解析
DataXceiverServer线程不停的监听客户端请求datanode数据,该线程接收到请求后会创建DataXceiver来处理客户端请求,并响应处理结果。
@Override
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
//创建一个守护线程并处理请求
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
// another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances
if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
IOUtils.cleanup(null, peer);
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
} catch (OutOfMemoryError ie) {
IOUtils.cleanup(null, peer);
// DataNode can run out of memory if there is too many transfers.
// Log the event, Sleep for 30 seconds, other transfers may complete by
// then.
LOG.error("DataNode is out of memory. Will retry in 30 seconds.", ie);
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
// ignore
}
} catch (Throwable te) {
LOG.error(datanode.getDisplayName()
+ ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
// Close the server to stop reception of more requests.
try {
peerServer.close();
closed = true;
} catch (IOException ie) {
LOG.warn(datanode.getDisplayName()
+ " :DataXceiverServer: close exception", ie);
}
// if in restart prep stage, notify peers before closing them.
if (datanode.shutdownForUpgrade) {
restartNotifyPeers();
// Each thread needs some time to process it. If a thread needs
// to send an OOB message to the client, but blocked on network for
// long time, we need to force its termination.
LOG.info("Shutting down DataXceiverServer before restart");
// Allow roughly up to 2 seconds.
for (int i = 0; getNumPeers() > 0 && i < 10; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// ignore
}
}
}
// Close all peers.
closeAllPeers();
}
2、XceiverServer.run()线程解析
readOp方法解析客户端请求操作类型,本次请求的是READ_BLOCK读取操作。
processOp方法处理客户端操作,本次是读取block请求。
/**
* Read/write data from/to the DataXceiverServer.
*/
@Override
public void run() {
int opsProcessed = 0;
Op op = null;
try {
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
try {
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in,
HdfsConstants.SMALL_BUFFER_SIZE);
socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) {
LOG.info("Failed to read expected encryption handshake from client " +
"at " + peer.getRemoteAddressString() + ". Perhaps the client " +
"is running an older version of Hadoop which does not support " +
"encryption");
} else {
LOG.info("Failed to read expected SASL data transfer protection " +
"handshake from client at " + peer.getRemoteAddressString() +
". Perhaps the client is running an older version of Hadoop " +
"which does not support SASL data transfer protection");
}
return;
}
super.initialize(new DataInputStream(input));
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else {
peer.setReadTimeout(dnConf.socketTimeout);
}
//读取客户端操作
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
// Since we optimistically expect the next op, it's quite normal to get EOF here.
if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
}
} else {
incrDatanodeNetworkErrors();
throw err;
}
break;
}
// restore normal timeout
if (opsProcessed != 0) {
peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = monotonicNow();
//处理客户端操作请求
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
String s = datanode.getDisplayName() + ":DataXceiver error processing "
+ ((op == null) ? "unknown" : op.name()) + " operation "
+ " src: " + remoteAddress + " dst: " + localAddress;
if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {
// For WRITE_BLOCK, it is okay if the replica already exists since
// client and replication may write the same block to the same datanode
// at the same time.
if (LOG.isTraceEnabled()) {
LOG.trace(s, t);
} else {
LOG.info(s + "; " + t);
}
} else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) {
String s1 =
"Likely the client has stopped reading, disconnecting it";
s1 += " (" + s + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(s1, t);
} else {
LOG.info(s1 + "; " + t);
}
} else {
LOG.error(s, t);
}
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
if (peer != null) {
dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
}
}
}
3、Receiver.processOp()解析
本次调用opReadBlock方法
/** Process op by the corresponding method. */
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
//读取block
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
datanode处理readBlock操作
/** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
//真正处理客户端请求 readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen(),
proto.getSendChecksums(),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
} finally {
if (traceScope != null) traceScope.close();
}
}
4、XceiverServer.readBlock()解析
该方法获取客户端out流,然后校验读取的block是否有权限。
其中writeSuccessWithChecksumInfo方法是通知客户端请求已经成功接受,客户端状态正常可以读取并且通知客户端当前datanode节点的校验信息,通过BlockOpResponseProto序列化响应给客户端。
blockSender是真正读取datanode的数据给客户端。
@Override
public void readBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName;
long read = 0;
//获取datanode连接客户端的IO流
OutputStream baseStream = getOutputStream();
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
//校验是否有权限读取,否则通过BlockOpResponseProto告诉客户端失败并终止输出流
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSender blockSender = null;
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
final String clientTraceFmt =
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
"%d", "HDFS_READ", clientName, "%d",
dnR.getDatanodeUuid(), block, "%d")
: dnR + " Served block " + block + " to " +
remoteAddress;
updateCurrentThreadName("Sending block " + block);
try {
try {
//
blockSender = new BlockSender(block, blockOffset, length,
true, false, sendChecksum, datanode, clientTraceFmt,
cachingStrategy);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
sendResponse(ERROR, msg);
throw e;
}
// send op status
//BlockOpResponseProto response Receiver,通知客户端请求已经成功接受,客户端状态正常可以读取并且通知客户端当前datanode节点的校验信息
writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
long beginRead = Time.monotonicNow();
//响应给客户端读取数据块
read = blockSender.sendBlock(out, baseStream, null); // send data
long duration = Time.monotonicNow() - beginRead;
if (blockSender.didSendEntireByteRange()) {
// If we sent the entire range, then we should expect the client
// to respond with a Status enum.
try {
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
PBHelper.vintPrefixed(in));
if (!stat.hasStatus()) {
LOG.warn("Client " + peer.getRemoteAddressString() +
" did not send a valid status code after reading. " +
"Will close connection.");
IOUtils.closeStream(out);
}
} catch (IOException ioe) {
LOG.debug("Error reading client status response. Will close connection.", ioe);
IOUtils.closeStream(out);
incrDatanodeNetworkErrors();
}
} else {
IOUtils.closeStream(out);
}
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
} catch ( SocketException ignored ) {
if (LOG.isTraceEnabled()) {
LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " +
remoteAddress, ignored);
}
// Its ok for remote side to close the connection anytime.
datanode.metrics.incrBlocksRead();
//socket异常就关闭流
IOUtils.closeStream(out);
} catch ( IOException ioe ) {
/* What exactly should we do here?
* Earlier version shutdown() datanode if there is disk error.
*/
if (!(ioe instanceof SocketTimeoutException)) {
LOG.warn(dnR + ":Got exception while serving " + block + " to "
+ remoteAddress, ioe);
incrDatanodeNetworkErrors();
}
throw ioe;
} finally {
//异常就关闭blockSender
IOUtils.closeStream(blockSender);
}
//update metrics
datanode.metrics.addReadBlockOp(elapsed());
datanode.metrics.incrReadsFromClient(peer.isLocal(), read);
}
5、BlockSender.readBlock()解析
由doSendBlock最终处理读取数据
/**
* sendBlock() is used to read block and its metadata and stream the data to
* either a client or to another datanode.
*
* @param out stream to which the block is written to
* @param baseStream optional. if non-null, <code>out</code> is assumed to
* be a wrapper over this stream. This enables optimizations for
* sending the data, e.g.
* {@link SocketOutputStream#transferToFully(FileChannel,
* long, int)}.
* @param throttler for sending data.
* @return total bytes read, including checksum data.
*/
long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
TraceScope scope =
Trace.startSpan("sendBlock_" + block.getBlockId(), Sampler.NEVER);
try {
return doSendBlock(out, baseStream, throttler);
} finally {
scope.close();
}
}
6、BlockSender.doSendBlock()解析
读取数据之前会调用manageOsCache来执行预读取操作提高读取性能,就是将数据提前加载到缓存中,该方法清理不在使用的缓存数据。并且构建存放packet的缓冲区。
private long doSendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
if (out == null) {
throw new IOException( "out stream is null" );
}
initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
lastCacheDropOffset = initialOffset;
if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially.
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), blockInFd, 0, 0,
NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
}
// Trigger readahead of beginning of file if configured.
manageOsCache();
final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
try {
int maxChunksPerPacket;
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// Smaller packet size to only hold checksum when doing transferTo
pktBufSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
offset += len;
totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
// If this thread was interrupted, then it did not send the full block.
if (!Thread.currentThread().isInterrupted()) {
try {
// send an empty packet to mark the end of the block
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
throttler);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
sentEntireByteRange = true;
}
} finally {
if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
final long endTime = System.nanoTime();
ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
}
//异常关闭流
close();
}
return totalRead;
}
7、BlockSender.sendPacket()解析
发送数据包packet,数据是通过transferTo方式写入到IO中,这种方式属于零copy,可以提高读写性能。并且控制发送的速度。
/**
* Sends a packet with up to maxChunks chunks of data.
*
* @param pkt buffer used for writing packet data
* @param maxChunks maximum number of chunks to send
* @param out stream to send data to
* @param transferTo use transferTo to send data
* @param throttler used for throttling data transfer bandwidth
*/
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
(chunkSize * (long) maxChunks));
int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
int checksumDataLen = numChunks * checksumSize;
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
// The packet buffer is organized as follows:
// _______HHHHCCCCD?D?D?D?
// ^ ^
// | \ checksumOff
// \ headerOff
// _ padding, since the header is variable-length
// H = header and length prefixes
// C = checksums
// D? = data, if transferTo is false.
int headerLen = writePacketHeader(pkt, dataLen, packetLen);
// Per above, the header doesn't start at the beginning of the
// buffer
int headerOff = pkt.position() - headerLen;
int checksumOff = pkt.position();
byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) {
readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get last checksum
if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
}
}
}
int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
IOUtils.readFully(blockIn, buf, dataOff, dataLen);
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
}
}
try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
} else {
// normal transfer
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException) {
/*
* writing to client timed out. This happens if the client reads
* part of a block and then decides not to read the rest (but leaves
* the socket open).
*
* Reporting of this case is done in DataXceiver#run
*/
} else {
/* Exception while writing to the client. Connection closure from
* the other end is mostly the case and we do not care much about
* it. But other things can go wrong, especially in transferTo(),
* which we do not want to ignore.
*
* The message parsing below should not be considered as a good
* coding example. NEVER do it to drive a program logic. NEVER.
* It was done here because the NIO throws an IOException for EPIPE.
*/
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
}
datanode.getBlockScanner().markSuspectBlock(
volumeRef.getVolume().getStorageID(),
block);
}
throw ioeToSocketException(e);
}
if (throttler != null) { // rebalancing so throttle
//节流阀控制发送速度
throttler.throttle(packetLen);
}
return dataLen;
}