hdfs写之写数据<二>
2019-08-27 本文已影响0人
古语1
一、写数据流程图
该流程主要是客户端开始写数据,然后把数据切分多个chunk,多个chunk组成一个packet,发送到dequeue中,等待datanode写数据。
image.png二、写数据流程
1、FSDataOutputStream.writer写方法
调用父类的FSOutputSummer.write方法
@Override
public void write(byte b[], int off, int len) throws IOException {
//2。 DFSOutputStream extends FSOutputSummer.write
out.write(b, off, len);
position += len; // update position
if (statistics != null) {
statistics.incrementBytesWritten(len);
}
}
该方法检查客户端端状态是否正常
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> and generate a checksum for
* each data chunk.
*
* <p> This method stores bytes from the given array into this
* stream's buffer before it gets checksumed. The buffer gets checksumed
* and flushed to the underlying output stream when all data
* in a checksum chunk are in the buffer. If the buffer is empty and
* requested length is at least as large as the size of next checksum chunk
* size, this method will checksum and write the chunk directly
* to the underlying output stream. Thus it avoids uneccessary data copy.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
//3
checkClosed();
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
//len is block size
for (int n=0; n<len; n += write1(b, off+n, len-n)) {
}
}
该方法最终调用writeChecksumChunks,包括flushBuffer也是调用writeChecksumChunks。写chunk数据到packet中。
/**
* Write a portion of an array, flushing to the underlying
* stream at most once if necessary.
*/
private int write1(byte b[], int off, int len) throws IOException {
//buf: internal buffer for storing data before it is checksumed
//如果buffer为空并且写入数据大于buffer长度(一个校验块chunk大小),直接将数据与校验写入IO中
if(count==0 && len>=buf.length) {
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;//4608=512*9=sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS
//每次正好写一个校验块chunk大小
writeChecksumChunks(b, off, length);
return length;
}
// 当数据小于本地数据库chunk时候,先写入buf,当buf写满之后,flushBuffer也执行writeChecksumChunks
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
// local buffer is full
flushBuffer();
}
return bytesToCopy;
}
根据写的数据大小,切分多个chunk分别由writeChunk写。
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
//len=4068
sum.calculateChunkedSums(b, off, len, checksum, 0);
//每次正好写一个校验块chunk大小,len=4608,getBytesPerChecksum=512
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
//chunkLen=512,和blocksize有什么数学计算关系?
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}
调用writeChunkImpl处理
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
try {
//len=512
writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
} finally {
scope.close();
}
}
该方法主要将数据和校验和写入packet中,如果packet写满了chunk或者达到blocksize就会将整个packet发送给dequeue队列中,等待线程DataStreamer 发送,最后发生一个空的packet告诉DataStreamer已经发送完成一个整的packet。
private synchronized void writeChunkImpl(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
// int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
// chunkLen = len
//写的数据不能大于校验块chunk大小,len=512,bytesPerChecksum=512
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
//实际写的校验和大小和给的值不一致
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
getChecksumSize() + " but found to be " + cklen);
}
//当前写入的packet包为空则重新创建
if (currentPacket == null) {
//DFSPacket maxChunks=chunksPerPacket,第一次chunksPerPacket=126,后续就=1
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + bytesCurBlock);
}
}
//写入校验和
currentPacket.writeChecksum(checksum, ckoff, cklen);
//写实际数据
currentPacket.writeData(b, offset, len);
//chunk个数加一
currentPacket.incNumChunks();
//当前block size累加len(512)
bytesCurBlock += len;
if (currentPacket.getNumChunks() == 126) {
System.out.println("========");
}
// If packet is full, enqueue it for transmission
//如果校验块或者写满了block size 则将packet放到queue中。
// 会不会有不等于的情况发生?由于incNumChunks是加一操作,所以肯定会有currentPacket.getNumChunks() == currentPacket.getMaxChunks()
//blockSize=65536
//如果当前bytesCurBlock大小大于默认的blockSize怎么办?这种情况好像出现不了
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
//将数据currentPacket放到队列dataqueue中,等待线程DataStreamer 发送
waitAndQueueCurrentPacket();
// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
//默认appendChunk false,如果chunk没有写满则appendChunk=true,见DataStreamer构造方法
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumBufSize();
}
if (!appendChunk) {
//writePacketSize=dfs.client-write-packet-size默认65536
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
System.out.println("psize="+psize);
//将chunksPerPacket重新赋值,本次计算为1
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//当getNumChunks() == currentPacket.getMaxChunks时候bytesCurBlock=64512
//dataQueue:
//0 = {DFSPacket@7262} "packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512"
//1 = {DFSPacket@7263} "packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65024"
//2 = {DFSPacket@7188} "packet seqno: 2 offsetInBlock: 65024 lastPacketInBlock: false lastByteOffsetInBlock: 65536"
//当block正好写满了,发送一个空packet
if (bytesCurBlock == blockSize) {
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
currentPacket.setSyncBlock(shouldSyncBlock);
waitAndQueueCurrentPacket();
//重新赋值
bytesCurBlock = 0;
lastFlushOffset = 0;
}
}
}