程序员

ZooKeeper源码分析之数据库日志

2020-11-02  本文已影响0人  兽怪海北

本篇我们来分析ZooKeeper的数据库日志,ZooKeeper的数据库日志分为两类,快照日志和事务日志。快照日志存储的数据库某个时刻的快照,事务日志存储的所有事务请求。

数据库日志的入口是SyncRequestProcessor的run方法:

if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
    if (shouldSnapshot()) {
        resetSnapshotStats();
        zks.getZKDatabase().rollLog();
        if (!snapThreadMutex.tryAcquire()) {
        } else {
            new ZooKeeperThread("Snapshot Thread") {
                public void run() {
                    try {
                        zks.takeSnapshot();
                    } catch (Exception e) {
                    } finally {
                        snapThreadMutex.release();
                    }
                }
            }.start();
        }
    }
}
  1. zks.getZKDatabase().append(si)最终会调用FileTnxLog的append方法,将事务请求(非事务请求包括watcher类请求和读请求)写入到日志文件缓冲输出流中,不一定会刷新到文件输出流中,更不一定会刷新到磁盘。

  2. 与snapshot日志大小相关的配置有两个,一个是zookeeper.snapCount,默认值100000,另一个是zookeeper.snapSizeLimitInKb,默认值是4194304,也就是4GB。zookeeper.snapCount,代表两次生成快照间最多有多少个请求,zookeeper.snapSizeLimitInKb代表两次生成快照间事务日志最多增大多少。

  3. private boolean shouldSnapshot() {
        int logCount = zks.getZKDatabase().getTxnCount();
        long logSize = zks.getZKDatabase().getTxnSize();
        return (logCount > (snapCount / 2 + randRoll))
               || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
    }
    

    randRoll是一个随机值,在0和snapCount / 2之间,randSize在0和snapSizeInBytes / 2之间。这两个随机值每次生成快照后都会重置。

  4. 如果需要生成快照日志,首先调用resetSnapshotStats()重置上面的两个跟快照日志相关的随机数,滚动事务日志并重置txnCount和txnSize,并启动一个线程序列化整个数据库生成快照日志。

else if (toFlush.isEmpty()) {
    if (nextProcessor != null) {
        nextProcessor.processRequest(si);
        if (nextProcessor instanceof Flushable) {
            ((Flushable) nextProcessor).flush();
        }
    }
    continue;
}

无待刷盘的请求,读请求或者超时请求不需要写入事务日志。

toFlush.add(si);
if (shouldFlush()) {
    flush();
}

将请求加入到待刷盘请求中,然后判断是否需要现在将事务日志刷盘,是的话调用flush()将事务日志刷盘。

private boolean shouldFlush() {
    long flushDelay = zks.getFlushDelay();
    long maxBatchSize = zks.getMaxBatchSize();
    if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
        return true;
    }
    return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
}

判断现在与上次刷盘的时间间隔是否超过了配置的刷盘间隔值,如果是返回true。判断待刷盘的请求数是否超过了配置的最大批处理数,如果是返回true。最大批处理数由zookeeper.maxBatchSize决定,默认值1000。最大刷盘间隔由zookeeper.flushDelay决定,默认是0代表不启用。

private void flush() throws IOException, RequestProcessorException {
    if (this.toFlush.isEmpty()) {
        return;
    }
    long flushStartTime = Time.currentElapsedTime();
    zks.getZKDatabase().commit();
    if (this.nextProcessor == null) {
        this.toFlush.clear();
    } else {
        while (!this.toFlush.isEmpty()) {
            final Request i = this.toFlush.remove();
            this.nextProcessor.processRequest(i);
        }
        if (this.nextProcessor instanceof Flushable) {
            ((Flushable) this.nextProcessor).flush();
        }
    }
    lastFlushTime = Time.currentElapsedTime();
}

调用zks.getZKDatabase().commit()将事务日志刷盘,并将请求依次交给下一个processor处理。zks.getZKDatabase().commit()最终会走到FileTxnLog的commit方法:

public synchronized void commit() throws IOException {
    if (logStream != null) {
        logStream.flush();
    }
    for (FileOutputStream log : streamsToFlush) {
        log.flush();
        if (forceSync) {
            FileChannel channel = log.getChannel();
            channel.force(false);
        }
    }
    while (streamsToFlush.size() > 1) {
        streamsToFlush.poll().close();
    }
    if (txnLogSizeLimit > 0) {
        long logSize = getCurrentLogSize();
        if (logSize > txnLogSizeLimit) {
            rollLog();
        }
    }
}
  1. logStream是BufferedOutputStream的实例,持有当前事务日志文件的输出流FileOutputStream,logStream.flush()将BufferedOutputStream的缓冲强制写入FileOutputStream。由于操作系统还有一层文件缓冲区(操作系统定期将其写入到磁盘),这一步完成后变更不一定已经持久化到了磁盘。

  2. streamsToFlush存储了已经滚动但是没有关闭的事务文件输出流和当前事务日志文件输出流。forceSync由zookeeper.forceSync决定,默认开启,决定是否强制刷新事务日志到磁盘。channel.force(false)最终会调用

    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
                                              jobject fdo, jboolean md)
    {
        jint fd = fdval(env, fdo);
        int result = 0;
        if (md == JNI_FALSE) {
            result = fdatasync(fd);
        } else {
            result = fsync(fd);
        }
        return handle(env, result, "Force failed");
    }
    

    fdatasync等待强制刷盘完成。

  3. 关闭已经滚动的事务日志文件。

  4. 判断是否需要滚动事务日志,如果需要,滚动日志。

我们来看一下事务日志追加的逻辑,来看一下FileTxnLog的append方法:

public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
    if (hdr == null) {
        return false;
    }
    if (logStream == null) {
        logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
        fos = new FileOutputStream(logFileWrite);
        logStream = new BufferedOutputStream(fos);
        oa = BinaryOutputArchive.getArchive(logStream);
        FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
        fhdr.serialize(oa, "fileheader");
        logStream.flush();
        filePadding.setCurrentSize(fos.getChannel().position());
        streamsToFlush.add(fos);
    }
    filePadding.padFile(fos.getChannel());
    byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
    if (buf == null || buf.length == 0) {
        throw new IOException("Faulty serialization for header " + "and txn");
    }
    Checksum crc = makeChecksumAlgorithm();
    crc.update(buf, 0, buf.length);
    oa.writeLong(crc.getValue(), "txnEntryCRC");
    Util.writeTxnBytes(oa, buf);
    return true;
}
  1. 判断是否是事务请求,如果不是直接返回。
  2. 判断日志是否已经滚动,若已滚动,新建一个事务日志文件,并写入文件头。
  3. filePadding.padFile(fos.getChannel())判断文件是否需要扩容,需要的话扩容。这里需要注意的是,事务日志文件大小是预分配的,预分配的大小由zookeeper.preAllocSize决定,默认是64MB,预分配的存在使得我们在强制刷盘的时候可以调fdatasync而不用调fsync同时更新文件的内容和文件的大小。
  4. 计算出请求的CRC校验值,然后写入检验值到文件,最后把请求写入文件。

我们来看一下快照日志的生成过程,具体的逻辑由FileSnap的serialize方法处理:

public synchronized void serialize(
    DataTree dt,
    Map<Long, Integer> sessions,
    boolean fsync) throws IOException {
    long lastZxid = dataTree.lastProcessedZxid;
    File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
    if (!close) {
        try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
            OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
            FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
            serialize(dt, sessions, oa, header);
            SnapStream.sealStream(snapOS, oa);
            if (dt.serializeZxidDigest(oa)) {
                SnapStream.sealStream(snapOS, oa);
            }
            lastSnapshotInfo = new SnapshotInfo(
                Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
                snapShot.lastModified() / 1000);
        }
    } else {
        throw new IOException("FileSnap has already been closed");
    }
}
  1. 写入文件头。
  2. 写入全局session、ACL和节点。
  3. 写入段结束标志。
  4. 写入签名值。
  5. 写入段结束标志。

最后,我们总结给出事务日志个快照日志的格式。

事务日志:

字段 长度
魔数 ZKLG 4
版本 2 4
数据库ID 0 8
请求1CRC值 计算得出 8
请求1 序列化后的字节数组 字节数字长度
请求结束符 B 1
... ... ...
请求nCRC值 计算得出 8
请求n 序列化后的字节数组 字节数字长度
请求结束符 B 1

快照日志:

字段 长度
魔数 ZKSN 4
版本 2 4
数据库ID -1 8
总会话数 总会话数 4
会话1ID sessionId 8
会话1超时值 sessionTimeout 4
... ... ..
会话nID sessionId 8
会话n超时值 sessionTimeout 4
总ACL数 总ACL数 4
节点1ACL ACL条目 不定
... ... ...
节点nACL ACL条目 不定
节点1 节点序列化后数据 不定
... ... ...
节点n 节点序列化后数据 不定
EOF / 5(1+/)
CRC 计算得出 8
EOF / 5
digestZxid 最后一次请求的zxid 8
digestVersion 2 4
digest 最后一次请求的digest 8
CRC 计算得出 8
EOF / 5
上一篇下一篇

猜你喜欢

热点阅读