Zookeeper(三)-持久化
概述
同mysql/redis类似zk持久化也分为快照(snapshot)和增量事务日志(txnlog)两种形式,两者结合使用来恢复数据;但是三者底层存储引擎数据结构不同,mysql使用B+树,redis使用全局哈希表,而zk使用LSM数据结构;
本节先来分析zk的snapshot和txnlog,后续再分析LSM数据结构跟B+树区别;
Log类图.png
一、snapshot
snapshot是内存快照,把当前时刻全量内存写入数据文件中;
1. 文件内容
1.1 内容结构
snapshot.png1.2 序列化内容snapshot.xx
image.png1.3 内容解析
也可以通过SnapshotFormatter
进行解析
ZNode Details (count=22):
----
/
cZxid = 0x00000000000000
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x00000000000000
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0000000000007c
cversion = 10
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x00000000000000
dataLength = 0
----
。。。
Session Details (sid, timeout, ephemeralCount):
0x1001fe6ce840022, 30000, 0
0x1001fe6ce840023, 30000, 0
0x1001fe6ce840020, 30000, 0
0x1001fe6ce840021, 30000, 0
0x1001fe6ce840026, 30000, 0
0x1001fe6ce840027, 30000, 0
0x1001fe6ce840024, 30000, 0
0x1001fe6ce840025, 30000, 0
0x1001fe6ce840028, 30000, 0
0x1001fe6ce840029, 30000, 0
0x1001fe6ce84001f, 30000, 0
2. 源码分析
SnapShot接口只定义了四个方法,反序列化、序列化、查找最新的snapshot文件、关闭资源
public interface SnapShot {
// 反序列化
long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
// 序列化
void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException;
// 查询最近的快照文件
File findMostRecentSnapshot() throws IOException;
// 关闭释放资源
void close() throws IOException;
}
序列化过程比较简单,主要看下反序列化过程
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// 查找100个合法的snapshot文件
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
// 默认为不合法
boolean foundValid = false;
// 遍历snapshot文件列表
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
// 反序列化出 DataTree,反序列化过程就是根据文件结构进行解析
deserialize(dt,sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
// 比较snapshot中校验值和读取计算出的值是否相等
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
// 从文件名中解析出zxid,即snapshot文件后缀
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}
-
1.
在配置的日志文件目录中,按照zxid对snapshot文件进行降序排序,并对文件合法性进行初步校验,然后取出前100个文件 -
2.
遍历snapshot文件,反序列化出 DataTree(反序列化过程就是根据文件结构进行解析),一旦CRC校验成功就结束遍历,即最终用于反序列化的是最近且合法
的snapshot文件; -
3.
返回最近且合法的zxid,即snapshot文件后缀
二、txnlog
txnlog是增量事务日志,实时记录每一条事务命令;
1. 文件内容
1.1 内容结构
txnlog.png- 每一次增量日志包含4部分:CRC验证码+消息头+消息体+结尾标志
-
Record
:不同请求命令对应不同的Record;
1.2 序列化内容log.cf
log.cf- 后面全是0,不足64M时用0填充;
1.3 内容解析
也可以通过LogFormatter
进行解析查看
ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
20-12-6 下午12时39分36秒 session 0x100142dc1660000 cxid 0x0 zxid 0x1e createSession 30000
20-12-6 下午12时39分52秒 session 0x100141757900001 cxid 0x0 zxid 0x1f closeSession null
20-12-6 下午12时39分59秒 session 0x100142dc1660000 cxid 0x2 zxid 0x20 create '/zk/test5,#353535,v{s{31,s{'world,'anyone}}},F,6
20-12-6 下午12时53分04秒 session 0x100142dc1660000 cxid 0x0 zxid 0x21 closeSession null
20-12-6 下午12时53分04秒 session 0x100142dc1660001 cxid 0x0 zxid 0x22 createSession 30000
20-12-6 下午12时53分36秒 session 0x100142dc166001d cxid 0x0 zxid 0x5f closeSession null
20-12-6 下午12时53分36秒 session 0x100142dc1660005 cxid 0x0 zxid 0x60 closeSession null
20-12-6 下午12时54分56秒 session 0x100142dc1660020 cxid 0x4 zxid 0x61 setACL '/zk/test1,v{s{4,s{'world,'anyone}}},1
20-12-6 下午12时55分39秒 session 0x100142dc1660020 cxid 0x6 zxid 0x62 create '/zk/test7,#373737,v{s{4,s{'world,'anyone}}},F,7
20-12-6 下午02时33分27秒 session 0x100142dc1660020 cxid 0x0 zxid 0x63 closeSession null
EOF reached after 70 txns.
2. 源码分析
TxnLog接口:
public interface TxnLog {
// 切换日志,一个日志文件达到一定大小就会生成一个新的
void rollLog() throws IOException;
// 添加一个请求到事务日志
boolean append(TxnHeader hdr, Record r) throws IOException;
// 通过事务id读取日志
TxnIterator read(long zxid) throws IOException;
// 获取事务性操作的最新zxid
long getLastLoggedZxid() throws IOException;
// 清空日志,与Leader保持同步
boolean truncate(long zxid) throws IOException;
// 获取数据库的id
long getDbId() throws IOException;
// 提交事务并进行确认
void commit() throws IOException;
// 关闭事务性日志
void close() throws IOException;
// 读取事务日志的迭代器接口
public interface TxnIterator {
// 获取事务头部
TxnHeader getHeader();
// 获取事务
Record getTxn();
boolean next() throws IOException;
// 关闭文件释放资源
void close() throws IOException;
}
}
重点分析下append
方法:
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException {
if (hdr == null) {
return false;
}
// 当前事务的zxid小于等于最后的zxid,打印告警日志,否则设置最后的zxid为当前事务zxid
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
// 判断当前输入流logStream是否已清空(在同步处理器SyncRequestProcessor中据一定算法得出一个count,记录大于count就要rollLog,开启一个新的文件,
// 算法是: 100000/2 + random.nextInt(100000/2), 这个十万是一个默认值可配置),清空开启一个新的文件写入
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
// 创建增量日志文件,文件后缀为当前事务zxid
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
// 序列化增量日志文件头
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
// 确保在填充前已经写完魔数
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
// 当文件大小不满64MB时,向文件填充0以达到64MB大小
currentSize = padFile(fos.getChannel());
// 将事务头和事务序列化成byte[]
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header and txn");
}
// 生成一个验证算法
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
// 写CRC验证码
oa.writeLong(crc.getValue(), "txnEntryCRC");
// 写buf 和 结尾标志 (byte) 0x42
Util.writeTxnBytes(oa, buf);
return true;
}
-
1.
判断当前输入流logStream
是否已清空(在同步处理器SyncRequestProcessor
中据一定算法得出一个count,记录大于count就要rollLog
,开启一个新的文件); -
2.
新常见日志文件时序列化日志文件头部; -
3.
当文件大小不满64MB时,向文件填充0以达到64MB大小; -
4.
序列化CRC校验码、消息头、消息体、结束标志
private long padFile(FileChannel fileChannel) throws IOException {
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
File Padding
是对WAL的优化,在往 WAL 里面追加日志的时候,如果当前的文件 block 不能保存新添加的日志,就要为文件分配新的block,这要更新文件 inode 里面的信息(例如 size)。如果我们使用的是 HHD 的话,就要先找到inode 所在的位置,然后回到新添加 block 的位置进行日志追加。为了减少这些查找(这些查找是磁盘随机IO,效率跟顺序IO相比不在同一个数量级),我们可以预先为WAL 分配 block。
zk就用到了这种优化,每次为 WAL 分配 64MB 的 block,即不足64M时用0填充,以减少随机磁盘IO;
再来分析下getLastLoggedZxid
方法
public long getLastLoggedZxid() {
// 查找开始于快照或快照之前的日志文件。返回此日志和所有后续日志。结果按文件的zxid升序排列
File[] files = getLogFiles(logDir.listFiles(), 0);
// 获取最大的zxid(最后一个log文件对应的zxid)
long maxLog=files.length>0 ? Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;
// 扫描最新的日志文件以找到最高的zxid (从文件头开始遍历,遍历到最后一个就是最大的)
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}
该方法是找到增量事务日志中最大的zxid,即最后一个txnlog文件最后一个写入的事务日志条目里的zxid;
-
1.
从日志路径中查找txnlog,即log开头的日志文件 -
2.
获取最后一个日志文件的zxid -
3.
读取最后一个日志文件,从文件头开始遍历,遍历到最后一个就是最大的zxid
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
// 升序排列所有增量日志文件
List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
long logZxid = 0;
// 查找在快照的zxid之前或同时开始的日志文件
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
if (fzxid > snapshotZxid) {
continue;
}
// 找到快照的zxid之前并且最接近的日志文件
if (fzxid > logZxid) {
logZxid = fzxid;
}
}
// 找到快照的zxid之前并且最接近的日志文件,以及其之后的所有日志文件
List<File> v=new ArrayList<File>(5);
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
if (fzxid < logZxid) {
continue;
}
v.add(f);
}
return v.toArray(new File[0]);
}
该方法返回升序排列后,最接近请求参数snapshot文件的txnlog文件开始及以后的所有txnlog文件;
-
1.
升序排列所有txnlog文件列表; -
2.
遍历文件列表,找到快照的zxid之前并且最接近的日志文件; -
3.
返回该日志文件以及其之后的所有日志文件列表;
小结
本节主要分析了快照日志FileSnap
和增量事务日志FileTxnLog
,而FileTxnSnapLog
中操作主要都是委托给这两者进行处理;
本节一个重点是日志文件的加载流程:
1.
首先获取100个snapshot文件,并将其按照文件名降序排列;
2.
循环读取这些文件,并反序列化DataTree,根据文件中的校验值进行合法性校验;
3.
如果文件合法,将最大的zxid赋值给lastProcessZxid,跳出循环;
4.
读取txnlog,按照lastProcessZxid+1,获取该条事务记录,作为新的事务日志记录;
5.
读取该记录直至文件结束,判断其合法性,调用processTransaction,提交到DataTree中;
------over------