Apache Bookkeeper —— Journal 源码分
简单梳理一下这个类的作用和逻辑。
主要功能
- 充当WAL
- 写请求处理:
首先在Bookkeeper服务端收到了写Entry的请求之后会交给Bookie
来处理
Bookie.addEntryInternal
这个方法会将请求携带的Entry信息写入LedgerStorage
(实际数据存储的位置 + 索引)
写入成功之后还会同时将这个请求写入Journal
。 - 其他重要信息:比如说
Ledger
被fence的信息、LAC等。
-
启动的时候将WAL进行replay,将记录在journal里面的内容重新apply到
LedgerStorage
里面,
避免之前写入LedgerStorage
的内容因为没有刷盘导致丢失。 -
checkpoint 逻辑
和其他WAL一样,需要记录一个位置,这个位置标识着LedgerStorage
里面的数据已经全都落盘了
这一个位置之前的WAL日志都可以被删除。 -
维护
JournalChannel
逻辑,写入WAL日志,日志轮转等。
大致逻辑
1. 写入:
整个写入是异步的,写入结果通过callback进行后续处理。
写入的参数会封装成为一个QueueEntry
放到写请求队列
class QueueEntry {
// 内容
ByteBuf entry;
long ledgerId;
long entryId;
// 写结果的callback
WriteCallback cb;
// 进入队列的时间,用来确定是否等待时间过长
long enqueueTime;
// 是否需要等内容落盘
boolean ackBeforeSync;
}
这个队列会被一个线程定期处理,这里先叫做BookieJournalWriteThread
好了(实际没有这个类)
取出之后会将携带的ByteBuffer
写入到JournalChannel
里面。这个线程专门处理这个逻辑,
其他的活不干。
这里先说下JournalChannel
这个类,这个类可以认为是底层journal磁盘文件的映射,
内部实现是一个带读写缓存的FileChannel
, 写入的时候先到写缓存,
有相应的逻辑主动触发写缓存写到包装的FileChannel
里面。
QueueEntry
的字节写入之后,可能内容在写缓存里面。
flush 逻辑
我们需要触发flush逻辑,将写缓存的内容写到FileChannel
里面。
这里flush和 sync 到磁盘不是一个说法。
flush 是调用FileChannel.write
为了减少调用次数
sync 是调用FileChannel.force
为了fsync 到磁盘
这里触发flush的条件有3种:
- 时间bound:这个请求入队之后,一段时间之后必须被处理(写入到channel或者落盘)
- 写请求的个数 || 累积的写请求的字节数
- 写请求队列为空(一般测试的时候出现这个,写请求很少的情况下大部分都会被1这个条件兜底)
满足flush 条件则会主动将写缓存的内容刷到FileChannel
里面。
如果不需要等待内容落盘(ackBeforeSync=false),则直接将callback提交到线程池执行回调。
之后写请求被放到一个等待flush的batch里面。
flush逻辑做完之后,会去判断是否需要落盘。
落盘(ForceWrite)逻辑
按照配置有下面几种条件需要落盘。
- 每次flush都需要落盘。
- journal 文件轮转,需要把之前的文件落盘。
- 按照配置的interval 落盘。
如果需要落盘则这个时候会将之前的batch 封装成为一个ForceWriteRequest
放到落盘队列里面。
这个队列会被ForceWriteThread
清空。
这里可以配置一个groupCommit
的逻辑。避免多次fsync
。
如果配置了这个则会将队列里面的请求合并到一起,触发单次的FileChannel.force
同样,落盘之后会将之前的callback 放到线程池去处理回调。
2. replay 逻辑
这个逻辑比较简单,就是启动的时候把这个文件的内容从上次成功checkpoint的位置开始读取。
把读到的内容再次写入到LedgerStorage
里面就ok。
3. checkpoint 逻辑
这个实际上和LedgerStorage
这个是联动的,如果这一段WAL上面的内容,已经被LedgerStorage
成功写到磁盘上了,那么这段WAL就可以被删除了。
这里会有一个LastLogMark
文件,标记了(journal文件,offset)表示这个文件在这个offset之前的内容可以被干掉了。
Journal
这个类实现了CheckpointSource
这个接口。
实际动作由SyncThread
(实现了Checkpointer
接口)执行。
每种LedgerStorage
的checkpoint触发条件不同。
entryLogPerLedgerEnabled || isDbLedgerStorage 会按照时间interval 定期触发checkpoint
InterleavedLedgerStorage 会在日志轮转的时候触发
SortedLedgerStorage 会在memtable 需要flush的时候触发
实际逻辑比较简单
public void checkpoint(Checkpoint checkpoint) {
// ...
ledgerStorage.checkpoint(checkpoint);
checkpointSource.checkpointComplete(checkpoint, true);
// ...
}
checkpointComplete 这个方法会刷新磁盘上的LastLogMarker
这个文件,同时落盘。
(主要逻辑在LedgerStorage.checkpoint
这里)
这里的磁盘是LedgerStorage的磁盘
总结
写入请求处理是异步的,提交之后就会被Journal线程处理。
Journal线程负责将内容写入Journal channel,同时按照一定条件执行flush逻辑。
如果判断需要进行刷盘则将刷盘batch包装成ForceWriteRequest
ForceWriteThread
清理队列进行group commit 处理。负责journal落盘。
对于写请求的callback不会在这两个执行,会被额外提交到callback线程池处理。