Apache Pulsar 指南

Apache Bookkeeper —— Journal 源码分

2021-02-01  本文已影响0人  WJL3333

简单梳理一下这个类的作用和逻辑。

主要功能

  1. 充当WAL
  1. 启动的时候将WAL进行replay,将记录在journal里面的内容重新apply到LedgerStorage里面,
    避免之前写入LedgerStorage的内容因为没有刷盘导致丢失。

  2. checkpoint 逻辑
    和其他WAL一样,需要记录一个位置,这个位置标识着LedgerStorage里面的数据已经全都落盘了
    这一个位置之前的WAL日志都可以被删除。

  3. 维护JournalChannel逻辑,写入WAL日志,日志轮转等。

大致逻辑

1. 写入:

整个写入是异步的,写入结果通过callback进行后续处理。
写入的参数会封装成为一个QueueEntry放到写请求队列

 class QueueEntry {
        // 内容
        ByteBuf entry;
        long ledgerId;
        long entryId;
        
       // 写结果的callback
        WriteCallback cb;
       // 进入队列的时间,用来确定是否等待时间过长
        long enqueueTime;
       // 是否需要等内容落盘
        boolean ackBeforeSync;
}

这个队列会被一个线程定期处理,这里先叫做BookieJournalWriteThread 好了(实际没有这个类)
取出之后会将携带的ByteBuffer 写入到JournalChannel里面。这个线程专门处理这个逻辑,
其他的活不干。

这里先说下JournalChannel 这个类,这个类可以认为是底层journal磁盘文件的映射,
内部实现是一个带读写缓存的FileChannel, 写入的时候先到写缓存,
有相应的逻辑主动触发写缓存写到包装的FileChannel里面。

QueueEntry 的字节写入之后,可能内容在写缓存里面。

flush 逻辑

我们需要触发flush逻辑,将写缓存的内容写到FileChannel里面。

这里flush和 sync 到磁盘不是一个说法。
flush 是调用FileChannel.write 为了减少调用次数
sync 是调用FileChannel.force 为了fsync 到磁盘

这里触发flush的条件有3种:

  1. 时间bound:这个请求入队之后,一段时间之后必须被处理(写入到channel或者落盘)
  2. 写请求的个数 || 累积的写请求的字节数
  3. 写请求队列为空(一般测试的时候出现这个,写请求很少的情况下大部分都会被1这个条件兜底)

满足flush 条件则会主动将写缓存的内容刷到FileChannel里面。
如果不需要等待内容落盘(ackBeforeSync=false),则直接将callback提交到线程池执行回调。
之后写请求被放到一个等待flush的batch里面。

flush逻辑做完之后,会去判断是否需要落盘。

落盘(ForceWrite)逻辑

按照配置有下面几种条件需要落盘。

  1. 每次flush都需要落盘。
  2. journal 文件轮转,需要把之前的文件落盘。
  3. 按照配置的interval 落盘。

如果需要落盘则这个时候会将之前的batch 封装成为一个ForceWriteRequest 放到落盘队列里面。

这个队列会被ForceWriteThread 清空。

这里可以配置一个groupCommit的逻辑。避免多次fsync
如果配置了这个则会将队列里面的请求合并到一起,触发单次的FileChannel.force
同样,落盘之后会将之前的callback 放到线程池去处理回调。

2. replay 逻辑

这个逻辑比较简单,就是启动的时候把这个文件的内容从上次成功checkpoint的位置开始读取。
把读到的内容再次写入到LedgerStorage 里面就ok。

3. checkpoint 逻辑

这个实际上和LedgerStorage 这个是联动的,如果这一段WAL上面的内容,已经被LedgerStorage成功写到磁盘上了,那么这段WAL就可以被删除了。

这里会有一个LastLogMark文件,标记了(journal文件,offset)表示这个文件在这个offset之前的内容可以被干掉了。

Journal 这个类实现了CheckpointSource 这个接口。
实际动作由SyncThread (实现了Checkpointer接口)执行。

每种LedgerStorage的checkpoint触发条件不同。

entryLogPerLedgerEnabled || isDbLedgerStorage 会按照时间interval 定期触发checkpoint
InterleavedLedgerStorage 会在日志轮转的时候触发
SortedLedgerStorage 会在memtable 需要flush的时候触发

实际逻辑比较简单

public void checkpoint(Checkpoint checkpoint) {
       // ...
            ledgerStorage.checkpoint(checkpoint);
            checkpointSource.checkpointComplete(checkpoint, true);
       // ...
    }

checkpointComplete 这个方法会刷新磁盘上的LastLogMarker 这个文件,同时落盘。
(主要逻辑在LedgerStorage.checkpoint这里)

这里的磁盘是LedgerStorage的磁盘

总结

写入请求处理是异步的,提交之后就会被Journal线程处理。
Journal线程负责将内容写入Journal channel,同时按照一定条件执行flush逻辑。
如果判断需要进行刷盘则将刷盘batch包装成ForceWriteRequest
ForceWriteThread清理队列进行group commit 处理。负责journal落盘。
对于写请求的callback不会在这两个执行,会被额外提交到callback线程池处理。

上一篇下一篇

猜你喜欢

热点阅读