Zookeeper(三)-持久化

2020-12-12  本文已影响0人  进击的蚂蚁zzzliu

概述

同mysql/redis类似zk持久化也分为快照(snapshot)和增量事务日志(txnlog)两种形式,两者结合使用来恢复数据;但是三者底层存储引擎数据结构不同,mysql使用B+树,redis使用全局哈希表,而zk使用LSM数据结构;
本节先来分析zk的snapshot和txnlog,后续再分析LSM数据结构跟B+树区别;


Log类图.png

一、snapshot

snapshot是内存快照,把当前时刻全量内存写入数据文件中;

1. 文件内容

1.1 内容结构
snapshot.png
1.2 序列化内容snapshot.xx
image.png
1.3 内容解析

也可以通过SnapshotFormatter进行解析

ZNode Details (count=22):
----
/
  cZxid = 0x00000000000000
  ctime = Thu Jan 01 08:00:00 CST 1970
  mZxid = 0x00000000000000
  mtime = Thu Jan 01 08:00:00 CST 1970
  pZxid = 0x0000000000007c
  cversion = 10
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 0
---- 
。。。
Session Details (sid, timeout, ephemeralCount):
0x1001fe6ce840022, 30000, 0
0x1001fe6ce840023, 30000, 0
0x1001fe6ce840020, 30000, 0
0x1001fe6ce840021, 30000, 0
0x1001fe6ce840026, 30000, 0
0x1001fe6ce840027, 30000, 0
0x1001fe6ce840024, 30000, 0
0x1001fe6ce840025, 30000, 0
0x1001fe6ce840028, 30000, 0
0x1001fe6ce840029, 30000, 0
0x1001fe6ce84001f, 30000, 0

2. 源码分析

SnapShot接口只定义了四个方法,反序列化、序列化、查找最新的snapshot文件、关闭资源

public interface SnapShot {   
    // 反序列化
    long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
    // 序列化
    void serialize(DataTree dt, Map<Long, Integer> sessions, File name)  throws IOException;
    // 查询最近的快照文件
    File findMostRecentSnapshot() throws IOException;
    // 关闭释放资源
    void close() throws IOException;
} 

序列化过程比较简单,主要看下反序列化过程

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
    // 查找100个合法的snapshot文件
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    File snap = null;
    // 默认为不合法
    boolean foundValid = false;
    // 遍历snapshot文件列表
    for (int i = 0; i < snapList.size(); i++) {
        snap = snapList.get(i);
        InputStream snapIS = null;
        CheckedInputStream crcIn = null;
        try {
            LOG.info("Reading snapshot " + snap);
            snapIS = new BufferedInputStream(new FileInputStream(snap));
            crcIn = new CheckedInputStream(snapIS, new Adler32());
            InputArchive ia = BinaryInputArchive.getArchive(crcIn);
            // 反序列化出 DataTree,反序列化过程就是根据文件结构进行解析
            deserialize(dt,sessions, ia);
            long checkSum = crcIn.getChecksum().getValue();
            long val = ia.readLong("val");
            // 比较snapshot中校验值和读取计算出的值是否相等
            if (val != checkSum) {
                throw new IOException("CRC corruption in snapshot :  " + snap);
            }
            foundValid = true;
            break;
        } catch(IOException e) {
            LOG.warn("problem reading snap file " + snap, e);
        } finally {
            if (snapIS != null) 
                snapIS.close();
            if (crcIn != null) 
                crcIn.close();
        } 
    }
    if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
    }
    // 从文件名中解析出zxid,即snapshot文件后缀
    dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
    return dt.lastProcessedZxid;
}

二、txnlog

txnlog是增量事务日志,实时记录每一条事务命令;

1. 文件内容

1.1 内容结构
txnlog.png
1.2 序列化内容log.cf
log.cf
1.3 内容解析

也可以通过LogFormatter进行解析查看

ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
20-12-6 下午12时39分36秒 session 0x100142dc1660000 cxid 0x0 zxid 0x1e createSession 30000
20-12-6 下午12时39分52秒 session 0x100141757900001 cxid 0x0 zxid 0x1f closeSession null
20-12-6 下午12时39分59秒 session 0x100142dc1660000 cxid 0x2 zxid 0x20 create '/zk/test5,#353535,v{s{31,s{'world,'anyone}}},F,6
20-12-6 下午12时53分04秒 session 0x100142dc1660000 cxid 0x0 zxid 0x21 closeSession null
20-12-6 下午12时53分04秒 session 0x100142dc1660001 cxid 0x0 zxid 0x22 createSession 30000
20-12-6 下午12时53分36秒 session 0x100142dc166001d cxid 0x0 zxid 0x5f closeSession null
20-12-6 下午12时53分36秒 session 0x100142dc1660005 cxid 0x0 zxid 0x60 closeSession null
20-12-6 下午12时54分56秒 session 0x100142dc1660020 cxid 0x4 zxid 0x61 setACL '/zk/test1,v{s{4,s{'world,'anyone}}},1
20-12-6 下午12时55分39秒 session 0x100142dc1660020 cxid 0x6 zxid 0x62 create '/zk/test7,#373737,v{s{4,s{'world,'anyone}}},F,7
20-12-6 下午02时33分27秒 session 0x100142dc1660020 cxid 0x0 zxid 0x63 closeSession null
EOF reached after 70 txns.

2. 源码分析

TxnLog接口:

public interface TxnLog {
    // 切换日志,一个日志文件达到一定大小就会生成一个新的
    void rollLog() throws IOException;
    // 添加一个请求到事务日志
    boolean append(TxnHeader hdr, Record r) throws IOException;
    // 通过事务id读取日志
    TxnIterator read(long zxid) throws IOException;
    // 获取事务性操作的最新zxid
    long getLastLoggedZxid() throws IOException;
    // 清空日志,与Leader保持同步
    boolean truncate(long zxid) throws IOException;
    // 获取数据库的id
    long getDbId() throws IOException;
    // 提交事务并进行确认
    void commit() throws IOException;
    // 关闭事务性日志
    void close() throws IOException;
    // 读取事务日志的迭代器接口
    public interface TxnIterator {
        // 获取事务头部
        TxnHeader getHeader();
        // 获取事务
        Record getTxn();
        boolean next() throws IOException;
        // 关闭文件释放资源
        void close() throws IOException;
    }
}

重点分析下append方法:

public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException {
    if (hdr == null) {
        return false;
    }
    // 当前事务的zxid小于等于最后的zxid,打印告警日志,否则设置最后的zxid为当前事务zxid
    if (hdr.getZxid() <= lastZxidSeen) {
        LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType());
    } else {
        lastZxidSeen = hdr.getZxid();
    }
    // 判断当前输入流logStream是否已清空(在同步处理器SyncRequestProcessor中据一定算法得出一个count,记录大于count就要rollLog,开启一个新的文件,
    // 算法是: 100000/2 + random.nextInt(100000/2), 这个十万是一个默认值可配置),清空开启一个新的文件写入
    if (logStream==null) {
       if(LOG.isInfoEnabled()){
            LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
       }
        // 创建增量日志文件,文件后缀为当前事务zxid
        logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
        fos = new FileOutputStream(logFileWrite);
        logStream=new BufferedOutputStream(fos);
        oa = BinaryOutputArchive.getArchive(logStream);
        FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
        // 序列化增量日志文件头
        fhdr.serialize(oa, "fileheader");
        // Make sure that the magic number is written before padding.
        // 确保在填充前已经写完魔数
        logStream.flush();
        currentSize = fos.getChannel().position();
        streamsToFlush.add(fos);
    }
    // 当文件大小不满64MB时,向文件填充0以达到64MB大小
    currentSize = padFile(fos.getChannel());
    // 将事务头和事务序列化成byte[]
    byte[] buf = Util.marshallTxnEntry(hdr, txn);
    if (buf == null || buf.length == 0) {
        throw new IOException("Faulty serialization for header and txn");
    }
    // 生成一个验证算法
    Checksum crc = makeChecksumAlgorithm();
    crc.update(buf, 0, buf.length);
    // 写CRC验证码
    oa.writeLong(crc.getValue(), "txnEntryCRC");
    // 写buf 和 结尾标志  (byte) 0x42
    Util.writeTxnBytes(oa, buf);
    return true;
}

private long padFile(FileChannel fileChannel) throws IOException {
    long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
    if (currentSize != newFileSize) {
        fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
        currentSize = newFileSize;
    }
    return currentSize;
}

File Padding是对WAL的优化,在往 WAL 里面追加日志的时候,如果当前的文件 block 不能保存新添加的日志,就要为文件分配新的block,这要更新文件 inode 里面的信息(例如 size)。如果我们使用的是 HHD 的话,就要先找到inode 所在的位置,然后回到新添加 block 的位置进行日志追加。为了减少这些查找(这些查找是磁盘随机IO,效率跟顺序IO相比不在同一个数量级),我们可以预先为WAL 分配 block。
zk就用到了这种优化,每次为 WAL 分配 64MB 的 block,即不足64M时用0填充,以减少随机磁盘IO;


再来分析下getLastLoggedZxid方法

public long getLastLoggedZxid() {
    // 查找开始于快照或快照之前的日志文件。返回此日志和所有后续日志。结果按文件的zxid升序排列
    File[] files = getLogFiles(logDir.listFiles(), 0);
    // 获取最大的zxid(最后一个log文件对应的zxid)
    long maxLog=files.length>0 ? Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;
    // 扫描最新的日志文件以找到最高的zxid (从文件头开始遍历,遍历到最后一个就是最大的)
    long zxid = maxLog;
    TxnIterator itr = null;
    try {
        FileTxnLog txn = new FileTxnLog(logDir);
        itr = txn.read(maxLog);
        while (true) {
            if(!itr.next())
                break;
            TxnHeader hdr = itr.getHeader();
            zxid = hdr.getZxid();
        }
    } catch (IOException e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        close(itr);
    }
    return zxid;
}

该方法是找到增量事务日志中最大的zxid,即最后一个txnlog文件最后一个写入的事务日志条目里的zxid;


public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
    // 升序排列所有增量日志文件
    List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
    long logZxid = 0;
    // 查找在快照的zxid之前或同时开始的日志文件
    for (File f : files) {
        long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
        if (fzxid > snapshotZxid) {
            continue;
        }
        // 找到快照的zxid之前并且最接近的日志文件
        if (fzxid > logZxid) {
            logZxid = fzxid;
        }
    }
    // 找到快照的zxid之前并且最接近的日志文件,以及其之后的所有日志文件
    List<File> v=new ArrayList<File>(5);
    for (File f : files) {
        long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
        if (fzxid < logZxid) {
            continue;
        }
        v.add(f);
    }
    return v.toArray(new File[0]);
}

该方法返回升序排列后,最接近请求参数snapshot文件的txnlog文件开始及以后的所有txnlog文件;

小结

本节主要分析了快照日志FileSnap和增量事务日志FileTxnLog,而FileTxnSnapLog中操作主要都是委托给这两者进行处理;
本节一个重点是日志文件的加载流程:
1.首先获取100个snapshot文件,并将其按照文件名降序排列;
2.循环读取这些文件,并反序列化DataTree,根据文件中的校验值进行合法性校验;
3.如果文件合法,将最大的zxid赋值给lastProcessZxid,跳出循环;
4.读取txnlog,按照lastProcessZxid+1,获取该条事务记录,作为新的事务日志记录;
5.读取该记录直至文件结束,判断其合法性,调用processTransaction,提交到DataTree中;
------over------

上一篇下一篇

猜你喜欢

热点阅读