WALLog 文件的写入

2024-02-07  本文已影响0人  wayyyy

代码详细见 log_writer.h / log_writer.cc。

成员
WritableFile *dest_;
int block_offset_; // Current offset in block

// crc32c values for all supported record types.  These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
AddRecord

该方法将记录写入一个Slice结构,调用 AddRecord 就会写入 WALLog 文件,AddRecord 根据写入的记录大小确定是否需要跨块,并据此得出相应的头部类型,然后将记录写入并且刷新到磁盘。

先来几个例子:
假设现在当前 WALLog 文件已经写入了 1000 字节,当前的待写入的 record 记录为500字节,一个块大小为32768字节,当前已经写入1000字节,那么该条500字节的记录可以完整的写入这个块中。

image.png

假设现在当前 WALLog 文件已经写入了 1000 字节,当前的待写入的 record 记录为 31755 字节,当前块还剩下:32768 - 1000 = 31768,那么写入该 record + header 则 剩下 32768 - 1000 - 31755 - 7 = 6 字节,而一个 record header 需要7字节,这时,这块剩余的6字节将被填充为 \x00\x00\x00\x00\x00\x00。

image.png

假设现在当前 WALLog 文件已经写入了 1000 字节,当前的待写入的 record 记录为 50000 字节,那么最终会为:

image.png
Status Writer::AddRecord(const Slice &slice)
{
    const char *ptr = slice.data();
    size_t left = slice.size();

    // Fragment the record if necessary and emit it.  Note that if slice
    // is empty, we still want to iterate once to emit a single
    // zero-length record
    Status s;
    bool begin = true;
    do
    {
        const int leftover = kBlockSize - block_offset_;
        assert(leftover >= 0);
        if (leftover < kHeaderSize)
        {
            // Switch to a new block
            if (leftover > 0)
            {
                // Fill the trailer (literal below relies on kHeaderSize being 7)
                static_assert(kHeaderSize == 7, "");
                dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
            }
            block_offset_ = 0;
        }

        // Invariant: we never leave < kHeaderSize bytes in a block.
        assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

        const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
        const size_t fragment_length = (left < avail) ? left : avail;

        RecordType type;
        const bool end = (left == fragment_length);
        if (begin && end)
        {
            type = kFullType;
        }
        else if (begin)
        {
            type = kFirstType;
        }
        else if (end)
        {
            type = kLastType;
        }
        else
        {
            type = kMiddleType;
        }

        s = EmitPhysicalRecord(type, ptr, fragment_length);
        ptr += fragment_length;
        left -= fragment_length;
        begin = false;
    } while (s.ok() && left > 0);
    return s;
}
const char *ptr = slice.data();
size_t left = slice.size();

left 表示此次写入数据的长度。随后进入do-while循环。

查看当前 block 剩下的是否 <7,如果 <7 则补位,并重置 block偏移。这样做的目的是保证:header部分不会跨block。

const int leftover = kBlockSize - block_offset_;    // 
assert(leftover >= 0);
if (leftover < kHeaderSize)
{
    // Switch to a new block
    if (leftover > 0)
    {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        static_assert(kHeaderSize == 7, "");
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
     }
     block_offset_ = 0;
}

计算 block 剩余大小(除开header长度)avail ,以及本次 log record 可写入数据长度 fragment_length 。

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;

下面分为几种情况,决定此次的 RecordType

const bool end = (left == fragment_length);
if (begin && end)  {  type = kFullType;  }
else if (begin)  {  type = kFirstType;  }
else if (end)  {  type = kLastType;  }
else  {  type = kMiddleType;  }

s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;

1、假设 left = 36,avail = 18,fragment_length = avail = 18,显然当前的Record不够,需要2个。所以第一次(begin && end) 成立,为
kFirstType,
2、假设left = 18,avail=36,fragment_length = avail = 18,显然,当前的Record就够了,只需要一次,(begin && end)成立,为kFullType

Status Writer::EmitPhysicalRecord(RecordType t, const char *ptr, size_t length)
{
    assert(length <= 0xffff); // Must fit in two bytes
    assert(block_offset_ + kHeaderSize + length <= kBlockSize);

    // Format the header
    char buf[kHeaderSize];
    buf[4] = static_cast<char>(length & 0xff);
    buf[5] = static_cast<char>(length >> 8);
    buf[6] = static_cast<char>(t);

    // Compute the crc of the record type and the payload.
    uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
    crc = crc32c::Mask(crc); // Adjust for storage
    EncodeFixed32(buf, crc);

    // Write the header and the payload
    Status s = dest_->Append(Slice(buf, kHeaderSize));
    if (s.ok())
    {
        s = dest_->Append(Slice(ptr, length));
        if (s.ok())
        {
            s = dest_->Flush();
        }
    }
    block_offset_ += kHeaderSize + length;
    return s;
}
WALLog 文件

LevelDB 每次进行写操作时,都需要调用AddRecord方法向 WALLog 文件写入此次的增加的键值对,并且根据WriteOptions来决定是否需要进行刷新磁盘操作。调用AddRecord的代码位于中的 write 方法

DBImpl::Write(const WriteOptions &options, WriteBatch *updates) 
{
    ...
    status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
    bool sync_error = false;
    if (status.ok() && options.sync)
    {
        status = logfile_->Sync();  // 调用刷盘函数
        if (!status.ok())
        {
            sync_error = true;
        }
    }
    ...
}

最终插入WALLog文件的数据格式可能如下:


image.png
上一篇 下一篇

猜你喜欢

热点阅读