Prometheus checkpoint源码阅读
由于Prometheus storage很多地方都是借鉴RocksDB的设计思想,下面引用RocksDB
对checkpoint
的介绍。
Checkpoint is a feature in RocksDB which provides the ability to take a snapshot of a running RocksDB database in a separate directory. Checkpoints can be used as a point in time snapshot, which can be opened Read-only to query rows as of the point in time or as a Writeable snapshot by opening it Read-Write. Checkpoints can be used for both full and incremental backups.
简单来说,Checkpoint
是某一个时间点上的snapshot
。
概念介绍
在介绍checkpoint之前我们先来分析一下Prometheus里面的一些术语及其scrape
的数据格式。
- 以下为node_exporter采样的部分
metrics
数据格式
# HELP node_cpu_seconds_total Seconds the cpus spent in each mode.
# TYPE node_cpu_seconds_total counter
node_cpu_seconds_total{cpu="0",mode="idle"} 52544.27
node_cpu_seconds_total{cpu="0",mode="nice"} 0
node_cpu_seconds_total{cpu="0",mode="system"} 6582
node_cpu_seconds_total{cpu="0",mode="user"} 7838.02
node_cpu_seconds_total{cpu="1",mode="idle"} 61939.86
node_cpu_seconds_total{cpu="1",mode="nice"} 0
node_cpu_seconds_total{cpu="1",mode="system"} 1991.59
node_cpu_seconds_total{cpu="1",mode="user"} 3031.76
从采样数据可以看出prometheus的数据模型主要包含Metric names
, lables
,以及samples
。
每个time series
都由metric name
和可选的labels
唯一标识。
<metric name>{<label name>=<label value>, ...}
Every time series is uniquely identified by its metric name and optional key-value pairs called labels.
Labels enable Prometheus's dimensional data model: any given combination of labels for the same metric name identifies a particular dimensional instantiation of that metric.
而Samples
包括一个浮点值以及毫秒精度的时间戳。
Samples form the actual time series data. Each sample consists of:
- a float64 value
- a millisecond-precision timestamp
Prometheus metric types主要包含四种,Counter
,Gauge
,Histogram
以及Summary
。
源码分析
(ENV) 🍺 /Users/xsky/go/src/github.com/microyahoo/prometheus ☞ tree data -h
data
├── [ 192] 01DPE8T5XPQ9ZYHSNJYBJBKGR6
│ ├── [ 96] chunks
│ │ └── [7.1K] 000001
│ ├── [ 22K] index
│ ├── [ 272] meta.json
│ └── [ 9] tombstones
├── [ 0] lock
├── [ 20K] queries.active
└── [ 256] wal
├── [ 0] 00000050
├── [ 0] 00000051
├── [ 0] 00000052
├── [ 0] 00000053
├── [ 10K] 00000054
└── [ 96] checkpoint.000049
└── [ 32K] 00000000
4 directories, 12 files
由于下面的源码分析会用到上述的目录结构,从上述可以看到wal
目录下有一个checkpoint.N
的目录,目录下包含相应的checkpoint文件。
// Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{}
var sgmReader io.ReadCloser
{
var sgmRange []wal.SegmentRange
// 查找wal目录下查找最近的checkpoint,
// 由于有可能有多个checkpoint,因此找的是最近的。
// 返回checkpoint的目录,以及checkpoint目录的后缀索引,
// 例如上面的目录结构中idx=49
dir, idx, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint")
}
last := idx + 1
// 这个地方需要判断一下是因为有可能没有打过checkpoint
if err == nil {
if from > last {
return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from)
}
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
// from从最后一次checkpoint的index+1开始
// 也就是说前面的已经打过checkpoint了
from = last
sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32})
}
// 起始段范围
sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to})
// 将其包装成SegmentRangeReader
sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...)
if err != nil {
return nil, errors.Wrap(err, "create segment reader")
}
defer sgmReader.Close()
}
cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
cpdirtmp := cpdir + ".tmp"
// 创建checkpoint.XXXXXX.tmp的临时目录
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
// 在checkpoint.XXXXXX.tmp的临时目录中创建WAL段文件,
// 如果目录为空,则初始WAL index为0,即00000000
cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()
r := wal.NewReader(sgmReader)
var (
series []RefSeries
samples []RefSample
tstones []Stone
dec RecordDecoder
enc RecordEncoder
buf []byte
recs [][]byte
)
// 依次读取每个record
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
// We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint.
// Remember where the record for this iteration starts.
start := len(buf)
rec := r.Record()
// 判断record的类型
switch dec.Type(rec) {
case RecordSeries:
// 先decode所有的series记录,下面有详细解释
series, err = dec.Series(rec, series)
if err != nil {
return nil, errors.Wrap(err, "decode series")
}
// Drop irrelevant series in place.
repl := series[:0]
// 根据keep判断哪些需要保留
for _, s := range series {
if keep(s.Ref) {
repl = append(repl, s)
}
}
// 将series encode到buf中
if len(repl) > 0 {
buf = enc.Series(repl, buf)
}
// 统计总共的,以及丢弃的series
stats.TotalSeries += len(series)
stats.DroppedSeries += len(series) - len(repl)
case RecordSamples:
// 从rec中decode所有的samples,下面有详细解释
samples, err = dec.Samples(rec, samples)
if err != nil {
return nil, errors.Wrap(err, "decode samples")
}
// Drop irrelevant samples in place.
repl := samples[:0]
for _, s := range samples {
// 将Samples中T小于mint的过滤掉
if s.T >= mint {
repl = append(repl, s)
}
}
if len(repl) > 0 {
// 将samples encode到buf中
buf = enc.Samples(repl, buf)
}
stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl)
case RecordTombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return nil, errors.Wrap(err, "decode deletes")
}
// Drop irrelevant tombstones in place.
repl := tstones[:0]
for _, s := range tstones {
for _, iv := range s.intervals {
// TODO why?
if iv.Maxt >= mint {
repl = append(repl, s)
break
}
}
}
if len(repl) > 0 {
buf = enc.Tombstones(repl, buf)
}
stats.TotalTombstones += len(tstones)
stats.DroppedTombstones += len(tstones) - len(repl)
default:
return nil, errors.New("invalid record type")
}
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])
// Flush records in 1 MB increments.
// 每当buf累积到1MB时flush一次
if len(buf) > 1*1024*1024 {
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
// If we hit any corruption during checkpointing, repairing is not an option.
// The head won't know which series records are lost.
if r.Err() != nil {
return nil, errors.Wrap(r.Err(), "read segments")
}
// Flush remaining records.
// flush剩余的records
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint")
}
// 将checkpoint.XXXXXX.tmp重命名为checkpoint.XXXXXX
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory")
}
return stats, nil
}
从代码中可以看出从SegmentRangeReader
中依次读取所有的records
, 每读取一条record会先判断其类型,record主要有三种类型Series
, Samples
, Tombstones
, 如下所示:
// RecordSeries is used to match WAL records of type Series.
RecordSeries RecordType = 1
// RecordSamples is used to match WAL records of type Samples.
RecordSamples RecordType = 2
// RecordTombstones is used to match WAL records of type Tombstones.
RecordTombstones RecordType = 3
按照代码描述record中第一个字节为type。
- 如果类型为Series,则需要从record中decode所有的series记录。
// Series appends series in rec to the given slice.
// 从rec中decode所有的series
func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
dec := encoding.Decbuf{B: rec}
if RecordType(dec.Byte()) != RecordSeries {
return nil, errors.New("invalid record type")
}
for len(dec.B) > 0 && dec.Err() == nil {
ref := dec.Be64()
lset := make(labels.Labels, dec.Uvarint())
for i := range lset {
lset[i].Name = dec.UvarintStr()
lset[i].Value = dec.UvarintStr()
}
sort.Sort(lset)
series = append(series, RefSeries{
Ref: ref,
Labels: lset,
})
}
if dec.Err() != nil {
return nil, dec.Err()
}
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return series, nil
}
- 分析如下:
- 第一个字节为type
- 接下来的8个字节为ref
- 接着读取可变长度的字节slice,表示Lables的个数。
- 接着是Name和value组成的label键值对。
- 其中解析Name和Value时都是先读取Name和Value的长度,接着读取指定长度的[]byte,将其转化为字符串。
+-----------------+-----------------+ | Name | Value | +-----------+-----+-----------+-----+ | Uvarint64 | len | Uvarint64 | len | +-----------------+-----------+-----+
- 接着将labels按照name进行排序。
- 生成一条完整的Series记录。
+-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
| type <1b> | ref <8b> | len(lables) | name | value | name | value | ... | ... | ref <8b> | len(lables) | name | value | name | value | ... | ... |
+-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
byte uint64 Uvarint
- 如果类型为Sampels
// Samples appends samples in rec to the given slice.
func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
dec := encoding.Decbuf{B: rec}
if RecordType(dec.Byte()) != RecordSamples {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
return samples, nil
}
var (
baseRef = dec.Be64()
baseTime = dec.Be64int64()
)
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
val := dec.Be64()
samples = append(samples, RefSample{
Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
})
}
if dec.Err() != nil {
return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples))
}
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return samples, nil
}
具体格式如下:
+-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
| type <1b> | baseRef <8b> | baseTime <8b> | dref | dtime | val | dref | dtime | val | ... | ... | ... |
+-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
byte uint64 int64 Varint64 Varint64 uint64
其中RefSample:
RefSample{
Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
}
- 如果类型为Tombstones
// Tombstones appends tombstones in rec to the given slice.
func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
dec := encoding.Decbuf{B: rec}
if RecordType(dec.Byte()) != RecordTombstones {
return nil, errors.New("invalid record type")
}
for dec.Len() > 0 && dec.Err() == nil {
// TODO 这个地方还得确认一下
// 这里每个stone只有一个Interval?
tstones = append(tstones, Stone{
ref: dec.Be64(),
intervals: Intervals{
{Mint: dec.Varint64(), Maxt: dec.Varint64()},
},
})
}
if dec.Err() != nil {
return nil, dec.Err()
}
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return tstones, nil
}
具体格式如下:
+-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
| type <1b> | ref <8b> | minT | maxT | ref <8b> | minT | maxT | ... | ... | ... |
+-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
byte uint64 Varint64 Varint64
其中Stone:
Stone{
ref: dec.Be64(),
intervals: Intervals{
{
Mint: dec.Varint64(),
Maxt: dec.Varint64()
},
}
}
问题点
- 从
SegmentRangeReader
中读取一条record后会先判断其type,第一个字节表示其类型吗?目前还没看到。 -
RecordDecoder.Tombstones
中每个Stone只对应一个Interval吗?