Prometheus checkpoint源码阅读

2019-10-19  本文已影响0人  酱油王0901

由于Prometheus storage很多地方都是借鉴RocksDB的设计思想,下面引用RocksDBcheckpoint的介绍。

Checkpoint is a feature in RocksDB which provides the ability to take a snapshot of a running RocksDB database in a separate directory. Checkpoints can be used as a point in time snapshot, which can be opened Read-only to query rows as of the point in time or as a Writeable snapshot by opening it Read-Write. Checkpoints can be used for both full and incremental backups.

简单来说,Checkpoint是某一个时间点上的snapshot

概念介绍

在介绍checkpoint之前我们先来分析一下Prometheus里面的一些术语及其scrape的数据格式。

# HELP node_cpu_seconds_total Seconds the cpus spent in each mode.
# TYPE node_cpu_seconds_total counter
node_cpu_seconds_total{cpu="0",mode="idle"} 52544.27
node_cpu_seconds_total{cpu="0",mode="nice"} 0
node_cpu_seconds_total{cpu="0",mode="system"} 6582
node_cpu_seconds_total{cpu="0",mode="user"} 7838.02
node_cpu_seconds_total{cpu="1",mode="idle"} 61939.86
node_cpu_seconds_total{cpu="1",mode="nice"} 0
node_cpu_seconds_total{cpu="1",mode="system"} 1991.59
node_cpu_seconds_total{cpu="1",mode="user"} 3031.76

从采样数据可以看出prometheus的数据模型主要包含Metric nameslables,以及samples

每个time series都由metric name和可选的labels唯一标识。

<metric name>{<label name>=<label value>, ...}

Every time series is uniquely identified by its metric name and optional key-value pairs called labels.

Labels enable Prometheus's dimensional data model: any given combination of labels for the same metric name identifies a particular dimensional instantiation of that metric.

Samples包括一个浮点值以及毫秒精度的时间戳。

Samples form the actual time series data. Each sample consists of:

  • a float64 value
  • a millisecond-precision timestamp

Prometheus metric types主要包含四种,CounterGaugeHistogram以及Summary


源码分析

(ENV) 🍺 /Users/xsky/go/src/github.com/microyahoo/prometheus ☞ tree data -h
data
├── [ 192]  01DPE8T5XPQ9ZYHSNJYBJBKGR6
│   ├── [  96]  chunks
│   │   └── [7.1K]  000001
│   ├── [ 22K]  index
│   ├── [ 272]  meta.json
│   └── [   9]  tombstones
├── [   0]  lock
├── [ 20K]  queries.active
└── [ 256]  wal
    ├── [   0]  00000050
    ├── [   0]  00000051
    ├── [   0]  00000052
    ├── [   0]  00000053
    ├── [ 10K]  00000054
    └── [  96]  checkpoint.000049
        └── [ 32K]  00000000

4 directories, 12 files

由于下面的源码分析会用到上述的目录结构,从上述可以看到wal目录下有一个checkpoint.N的目录,目录下包含相应的checkpoint文件。

// Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
    stats := &CheckpointStats{}
    var sgmReader io.ReadCloser

    {

        var sgmRange []wal.SegmentRange
        // 查找wal目录下查找最近的checkpoint,
        // 由于有可能有多个checkpoint,因此找的是最近的。
        // 返回checkpoint的目录,以及checkpoint目录的后缀索引,
        // 例如上面的目录结构中idx=49
        dir, idx, err := LastCheckpoint(w.Dir())
        if err != nil && err != ErrNotFound {
            return nil, errors.Wrap(err, "find last checkpoint")
        }
        last := idx + 1
        // 这个地方需要判断一下是因为有可能没有打过checkpoint
        if err == nil {
            if from > last {
                return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from)
            }
            // Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
            // from从最后一次checkpoint的index+1开始
            // 也就是说前面的已经打过checkpoint了
            from = last

            sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32})
        }
        
        // 起始段范围
        sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to})
        // 将其包装成SegmentRangeReader
        sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...)
        if err != nil {
            return nil, errors.Wrap(err, "create segment reader")
        }
        defer sgmReader.Close()
    }

    cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
    cpdirtmp := cpdir + ".tmp"

    // 创建checkpoint.XXXXXX.tmp的临时目录
    if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
        return nil, errors.Wrap(err, "create checkpoint dir")
    }
    // 在checkpoint.XXXXXX.tmp的临时目录中创建WAL段文件,
    // 如果目录为空,则初始WAL index为0,即00000000
    cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
    if err != nil {
        return nil, errors.Wrap(err, "open checkpoint")
    }

    // Ensures that an early return caused by an error doesn't leave any tmp files.
    defer func() {
        cp.Close()
        os.RemoveAll(cpdirtmp)
    }()

    r := wal.NewReader(sgmReader)
    
    var (
        series  []RefSeries
        samples []RefSample
        tstones []Stone
        dec     RecordDecoder
        enc     RecordEncoder
        buf     []byte
        recs    [][]byte
    )
    // 依次读取每个record
    for r.Next() {
        series, samples, tstones = series[:0], samples[:0], tstones[:0]

        // We don't reset the buffer since we batch up multiple records
        // before writing them to the checkpoint.
        // Remember where the record for this iteration starts.
        start := len(buf)
        rec := r.Record()

        // 判断record的类型
        switch dec.Type(rec) {
        case RecordSeries:
            // 先decode所有的series记录,下面有详细解释
            series, err = dec.Series(rec, series)
            if err != nil {
                return nil, errors.Wrap(err, "decode series")
            }
            // Drop irrelevant series in place.
            repl := series[:0]
            // 根据keep判断哪些需要保留
            for _, s := range series {
                if keep(s.Ref) {
                    repl = append(repl, s)
                }
            }
            // 将series encode到buf中
            if len(repl) > 0 {
                buf = enc.Series(repl, buf)
            }
            // 统计总共的,以及丢弃的series
            stats.TotalSeries += len(series)
            stats.DroppedSeries += len(series) - len(repl)

        case RecordSamples:
            // 从rec中decode所有的samples,下面有详细解释
            samples, err = dec.Samples(rec, samples)
            if err != nil {
                return nil, errors.Wrap(err, "decode samples")
            }
            // Drop irrelevant samples in place.
            repl := samples[:0]
            for _, s := range samples {
                // 将Samples中T小于mint的过滤掉
                if s.T >= mint {
                    repl = append(repl, s)
                }
            }
            if len(repl) > 0 {
                // 将samples encode到buf中
                buf = enc.Samples(repl, buf)
            }
            stats.TotalSamples += len(samples)
            stats.DroppedSamples += len(samples) - len(repl)

        case RecordTombstones:
            tstones, err = dec.Tombstones(rec, tstones)
            if err != nil {
                return nil, errors.Wrap(err, "decode deletes")
            }
            // Drop irrelevant tombstones in place.
            repl := tstones[:0]
            for _, s := range tstones {
                for _, iv := range s.intervals {
                    // TODO why?
                    if iv.Maxt >= mint {
                        repl = append(repl, s)
                        break
                    }
                }
            }
            if len(repl) > 0 {
                buf = enc.Tombstones(repl, buf)
            }
            stats.TotalTombstones += len(tstones)
            stats.DroppedTombstones += len(tstones) - len(repl)

        default:
            return nil, errors.New("invalid record type")
        }
        if len(buf[start:]) == 0 {
            continue // All contents discarded.
        }
        recs = append(recs, buf[start:])

        // Flush records in 1 MB increments.
        // 每当buf累积到1MB时flush一次
        if len(buf) > 1*1024*1024 {
            if err := cp.Log(recs...); err != nil {
                return nil, errors.Wrap(err, "flush records")
            }
            buf, recs = buf[:0], recs[:0]
        }
    }
    // If we hit any corruption during checkpointing, repairing is not an option.
    // The head won't know which series records are lost.
    if r.Err() != nil {
        return nil, errors.Wrap(r.Err(), "read segments")
    }

    // Flush remaining records.
    // flush剩余的records
    if err := cp.Log(recs...); err != nil {
        return nil, errors.Wrap(err, "flush records")
    }
    if err := cp.Close(); err != nil {
        return nil, errors.Wrap(err, "close checkpoint")
    }
    // 将checkpoint.XXXXXX.tmp重命名为checkpoint.XXXXXX
    if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
        return nil, errors.Wrap(err, "rename checkpoint directory")
    }

    return stats, nil
}

从代码中可以看出从SegmentRangeReader中依次读取所有的records, 每读取一条record会先判断其类型,record主要有三种类型Series, Samples, Tombstones, 如下所示:

// RecordSeries is used to match WAL records of type Series.
RecordSeries RecordType = 1
// RecordSamples is used to match WAL records of type Samples.
RecordSamples RecordType = 2
// RecordTombstones is used to match WAL records of type Tombstones.
RecordTombstones RecordType = 3

按照代码描述record中第一个字节为type。

// Series appends series in rec to the given slice.
// 从rec中decode所有的series
func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
    dec := encoding.Decbuf{B: rec}

    if RecordType(dec.Byte()) != RecordSeries {
        return nil, errors.New("invalid record type")
    }
    for len(dec.B) > 0 && dec.Err() == nil {
        ref := dec.Be64()

        lset := make(labels.Labels, dec.Uvarint())

        for i := range lset {
            lset[i].Name = dec.UvarintStr()
            lset[i].Value = dec.UvarintStr()
        }
        sort.Sort(lset)

        series = append(series, RefSeries{
            Ref:    ref,
            Labels: lset,
        })
    }
    if dec.Err() != nil {
        return nil, dec.Err()
    }
    if len(dec.B) > 0 {
        return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
    }
    return series, nil
}
+-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
| type <1b> | ref <8b> | len(lables) | name | value  | name | value | ... | ... | ref <8b> | len(lables) | name | value  | name | value | ... | ... |
+-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
    byte       uint64     Uvarint
// Samples appends samples in rec to the given slice.
func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
    dec := encoding.Decbuf{B: rec}

    if RecordType(dec.Byte()) != RecordSamples {
        return nil, errors.New("invalid record type")
    }
    if dec.Len() == 0 {
        return samples, nil
    }
    var (
        baseRef  = dec.Be64()
        baseTime = dec.Be64int64()
    )
    for len(dec.B) > 0 && dec.Err() == nil {
        dref := dec.Varint64()
        dtime := dec.Varint64()
        val := dec.Be64()

        samples = append(samples, RefSample{
            Ref: uint64(int64(baseRef) + dref),
            T:   baseTime + dtime,
            V:   math.Float64frombits(val),
        })
    }

    if dec.Err() != nil {
        return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples))
    }
    if len(dec.B) > 0 {
        return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
    }
    return samples, nil
}

具体格式如下:

+-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
| type <1b> | baseRef <8b> | baseTime <8b> | dref | dtime  |  val | dref  |  dtime | val | ... | ... | ... | 
+-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
    byte        uint64          int64      Varint64 Varint64 uint64

其中RefSample:

RefSample{
    Ref: uint64(int64(baseRef) + dref),
    T:   baseTime + dtime,
    V:   math.Float64frombits(val),
}
// Tombstones appends tombstones in rec to the given slice.
func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
    dec := encoding.Decbuf{B: rec}

    if RecordType(dec.Byte()) != RecordTombstones {
        return nil, errors.New("invalid record type")
    }
    for dec.Len() > 0 && dec.Err() == nil {
        // TODO 这个地方还得确认一下
        // 这里每个stone只有一个Interval?
        tstones = append(tstones, Stone{
            ref: dec.Be64(),
            intervals: Intervals{
                {Mint: dec.Varint64(), Maxt: dec.Varint64()},
            },
        })
    }
    if dec.Err() != nil {
        return nil, dec.Err()
    }
    if len(dec.B) > 0 {
        return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
    }
    return tstones, nil
}

具体格式如下:

+-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
| type <1b> | ref <8b> | minT |  maxT  |  ref <8b> | minT |  maxT   | ... | ... | ... |
+-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
    byte      uint64   Varint64 Varint64 

其中Stone:

Stone{
    ref: dec.Be64(),
    intervals: Intervals{
        {
            Mint: dec.Varint64(), 
            Maxt: dec.Varint64()
        },
    }
}

问题点


References

上一篇下一篇

猜你喜欢

热点阅读