Prometheus DB

2019-11-24  本文已影响0人  酱油王0901

Prometheus TSDB初始化

// Open returns a new DB in the given directory.
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) {
    if err := os.MkdirAll(dir, 0777); err != nil {
        return nil, err
    }
    if l == nil {
        l = log.NewNopLogger()
    }
    if opts == nil {
        opts = DefaultOptions
    }
    ......
    db = &DB{
        dir:         dir,
        logger:      l,
        opts:        opts,
        compactc:    make(chan struct{}, 1),
        donec:       make(chan struct{}),
        stopc:       make(chan struct{}),
        autoCompact: true,
        chunkPool:   chunkenc.NewPool(),
    }
    .......
    db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool)
    if err != nil {
        cancel()
        return nil, errors.Wrap(err, "create leveled compactor")
    }
    db.compactCancel = cancel

    var wlog *wal.WAL
    segmentSize := wal.DefaultSegmentSize
    // Wal is enabled.
    if opts.WALSegmentSize >= 0 {
        // Wal is set to a custom size.
        if opts.WALSegmentSize > 0 {
            segmentSize = opts.WALSegmentSize
        }
        wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
        if err != nil {
            return nil, err
        }
    }

    db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0])
    if err != nil {
        return nil, err
    }

    if err := db.reload(); err != nil {
        return nil, err
    }
    // Set the min valid time for the ingested samples
    // to be no lower than the maxt of the last block.
    blocks := db.Blocks()
    minValidTime := int64(math.MinInt64)
    if len(blocks) > 0 {
        minValidTime = blocks[len(blocks)-1].Meta().MaxTime
    }

    if initErr := db.head.Init(minValidTime); initErr != nil {
        db.head.metrics.walCorruptionsTotal.Inc()
        level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
        if err := wlog.Repair(initErr); err != nil {
            return nil, errors.Wrap(err, "repair corrupted WAL")
        }
    }

    go db.run()

    return db, nil
}
// reload blocks and trigger head truncation if new blocks appeared.
// Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) {
    defer func() {
        if err != nil {
            db.metrics.reloadsFailed.Inc()
        }
        db.metrics.reloads.Inc()
    }()

    loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
    if err != nil {
        return err
    }

    deletable := db.deletableBlocks(loadable)
    
    .....
    
    if err := db.deleteBlocks(deletable); err != nil {
        return err
    }

    // Garbage collect data in the head if the most recent persisted block
    // covers data of its current time range.
    if len(loadable) == 0 {
        return nil
    }

    // 获取blocks的最大时间戳,由于前面已经按照最小时间戳
    // 排序了,因此最后一个block的最大时间戳即为所有blocks
    // 的最大时间戳
    maxt := loadable[len(loadable)-1].Meta().MaxTime

    // Truncate从head中删除时间小于maxt的数据
    return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
    // 获取data目录下的所有block目录, 即所有的`ULID`目录
    bDirs, err := blockDirs(dir)
    if err != nil {
        return nil, nil, errors.Wrap(err, "find blocks")
    }

    corrupted = make(map[ulid.ULID]error)
    for _, bDir := range bDirs {
        meta, _, err := readMetaFile(bDir)
        if err != nil {
            level.Error(l).Log("msg", "not a block dir", "dir", bDir)
            continue
        }

        // See if we already have the block in memory or open it otherwise.
        // 看内存中是否已加载了block
        block, open := getBlock(loaded, meta.ULID)
        if !open {
            // 如果内存中没有加载,则打开block
            block, err = OpenBlock(l, bDir, chunkPool)
            if err != nil {
                corrupted[meta.ULID] = err
                continue
            }
        }
        blocks = append(blocks, block)
    }
    return blocks, corrupted, nil
}
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
    if logger == nil {
        logger = log.NewNopLogger()
    }
    var closers []io.Closer
    defer func() {
        if err != nil {
            var merr tsdb_errors.MultiError
            merr.Add(err)
            merr.Add(closeAll(closers))
            err = merr.Err()
        }
    }()
    // 读取block下的meta file, sizeMeta即为meta文件的大小
    meta, sizeMeta, err := readMetaFile(dir)
    if err != nil {
        return nil, err
    }

    // 读取block下的chunks
    cr, err := chunks.NewDirReader(chunkDir(dir), pool)
    if err != nil {
        return nil, err
    }
    closers = append(closers, cr)

     // 读取block下的index
    ir, err := index.NewFileReader(filepath.Join(dir, indexFilename))
    if err != nil {
        return nil, err
    }
    closers = append(closers, ir)
     // 读取block下的tombstones
    tr, sizeTomb, err := readTombstones(dir)
    if err != nil {
        return nil, err
    }
    closers = append(closers, tr)

    pb = &Block{
        dir:               dir,
        meta:              *meta,
        chunkr:            cr,
        indexr:            ir,
        tombstones:        tr,
        // symbolTableSize的大小为所有symbols的字符串长度,
        // 以及每个symbol对应一个8字节的长度,
        // 即(len(s) + 8) * n
        symbolTableSize:   ir.SymbolTableSize(),
        logger:            logger,
        numBytesChunks:    cr.Size(),
        numBytesIndex:     ir.Size(),
        numBytesTombstone: sizeTomb,
        numBytesMeta:      sizeMeta,
    }
    return pb, nil
}

Prometheus目录结构如下所示,可以看出一个ULID目录 代表一个block,而每个block下面又包含chunks目录index文件meta.json文件tombstones文件

(ENV) [root@ceph-2 prometheus]# tree -h
.
├── [  64]  config
│   ├── [  54]  cmd.conf
│   ├── [3.5K]  prometheus.yml
│   └── [  61]  targets.json
└── [ 301]  data
    ├── [  90]  01DQ85SF1G2XKRCBRGTRK0D4TZ
    │   ├── [  20]  chunks
    │   │   └── [324M]  000001
    │   ├── [ 14M]  index
    │   ├── [1.7K]  meta.json
    │   └── [   9]  tombstones
    ├── [  68]  01DQX6PY58KCW19HHNB3XFBR5S
    │   ├── [  34]  chunks
    │   │   ├── [512M]  000001
    │   │   └── [287M]  000002
    │   ├── [ 29M]  index
    │   ├── [3.0K]  meta.json
    │   └── [   9]  tombstones
    ├── [  68]  01DRAQ9P0R83G5YV2JRRW13985
    │   ├── [  34]  chunks
    │   │   ├── [512M]  000001
    │   │   └── [108M]  000002
    │   ├── [ 29M]  index
    │   ├── [3.2K]  meta.json
    │   └── [   9]  tombstones
    ├── [  68]  01DRGGP4J2T8Q64QC759TCJAE5
    │   ├── [  20]  chunks
    │   │   └── [233M]  000001
    │   ├── [ 10M]  index
    │   ├── [1.5K]  meta.json
    │   └── [   9]  tombstones
    ├── [  68]  01DRPA2RGZB5ZZ0FTZE9J1PK3D
    │   ├── [  20]  chunks
    │   │   └── [204M]  000001
    │   ├── [9.6M]  index
    │   ├── [1.5K]  meta.json
    │   └── [   9]  tombstones
    ├── [  68]  01DRR7W84D6EPWKM1V3W9SPHYZ
    │   ├── [  20]  chunks
    │   │   └── [ 74M]  000001
    │   ├── [3.7M]  index
    │   ├── [ 901]  meta.json
    │   └── [   9]  tombstones
    ├── [  68]  01DRRWFBPM63QTM153VMPRV2SZ
    │   ├── [  20]  chunks
    │   │   └── [7.6M]  000001
    │   ├── [766K]  index
    │   ├── [ 280]  meta.json
    │   └── [   9]  tombstones
    ├── [  68]  01DRRWFEQX20EFFF6V5T3FETEE
    │   ├── [  20]  chunks
    │   │   └── [ 23M]  000001
    │   ├── [1.5M]  index
    │   ├── [ 703]  meta.json
    │   └── [   9]  tombstones
    ├── [   0]  lock
    └── [  79]  wal
        ├── [128M]  00000138
        ├── [128M]  00000139
        ├── [ 91M]  00000140
        └── [  22]  checkpoint.000137
            └── [800K]  00000000

20 directories, 42 files

下面分别介绍如何读取block下的chunksindex以及tombstones


chunks读取

其中读取chunks的源代码位于tsdb/chunks/chunks.go文件中。

// NewDirReader returns a new Reader against sequentially  
// numbered files in the given directory.
func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
    // 获取所有的chunk文件,chunk文件以数字命名
    files, err := sequenceFiles(dir)
    if err != nil {
        return nil, err
    }
    if pool == nil {
        pool = chunkenc.NewPool()
    }

    var (
        bs   []ByteSlice
        cs   []io.Closer
        merr tsdb_errors.MultiError
    )
    // 打开所有的chunks文件并将其使用mmap映射到字节数组中
    for _, fn := range files {
        f, err := fileutil.OpenMmapFile(fn)
        if err != nil {
            merr.Add(errors.Wrap(err, "mmap files"))
            merr.Add(closeAll(cs))
            return nil, merr
        }
        cs = append(cs, f)
        bs = append(bs, realByteSlice(f.Bytes()))
    }

    // 返回Reader,此Reader实现了ChunkReader接口
    reader, err := newReader(bs, cs, pool)
    if err != nil {
        merr.Add(err)
        merr.Add(closeAll(cs))
        return nil, merr
    }
    return reader, nil
}

ChunkReader接口定义如下:

// ChunkReader provides reading access of serialized time series data.
type ChunkReader interface {
    // Chunk returns the series data chunk with the given reference.
    Chunk(ref uint64) (chunkenc.Chunk, error)

    // Close releases all underlying resources of the reader.
    Close() error
}

tsdb/chunks/chunks.go文件中的Reader实现了ChunkReader接口

type Reader struct {
    bs   []ByteSlice // The underlying bytes holding the encoded series data.
    cs   []io.Closer // Closers for resources behind the byte slices.
    size int64       // The total size of bytes in the reader.
    pool chunkenc.Pool
}

func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
    cr := Reader{pool: pool, bs: bs, cs: cs}
    var totalSize int64

    for i, b := range cr.bs {
        if b.Len() < chunkHeaderSize {
            return nil, errors.Wrapf(errInvalidSize, "invalid chunk header in segment %d", i)
        }
        // Verify magic number.
        if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks {
            return nil, errors.Errorf("invalid magic number %x", m)
        }

        // Verify chunk format version.
        if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
            return nil, errors.Errorf("invalid chunk format version %d", v)
        }
        totalSize += int64(b.Len())
    }
    cr.size = totalSize
    return &cr, nil
}
  • The following describes the format of a chunks file, which is created in the chunks/ directory of a block. The maximum size per segment file is 512MiB.
  • Chunks in the files are referenced from the index by uint64 composed of in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).

chunk的header为5个字节大小,其中4字节为magic number,1字节为version。Reader在计算总大小时会将header所占的字节数去掉???。

+------------------------------+
|  magic(0x85BD40DD) <4 byte>  |
+------------------------------+
|    version(1) <1 byte>       |
+------------------------------+
|    padding(0) <3 byte>?      |
+------------------------------+
| +--------------------------+ |
| |         Chunk 1          | |
| +--------------------------+ |
| |         ......           | |
| +--------------------------+ |
| |         Chunk N          | |
| +--------------------------+ |
+------------------------------+

读取文件用到了Unix/Linux中的mmap

type MmapFile struct {
    f *os.File
    b []byte
}

func OpenMmapFile(path string) (*MmapFile, error) {
    f, err := os.Open(path)
    if err != nil {
        return nil, errors.Wrap(err, "try lock file")
    }
    info, err := f.Stat()
    if err != nil {
        return nil, errors.Wrap(err, "stat")
    }

    b, err := mmap(f, int(info.Size()))
    if err != nil {
        return nil, errors.Wrap(err, "mmap")
    }

    return &MmapFile{f: f, b: b}, nil
}

func mmap(f *os.File, length int) ([]byte, error) {
    return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, unix.MAP_SHARED)
}

index读取

// NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string) (*Reader, error) {
    // 打开index文件
    f, err := fileutil.OpenMmapFile(path)
    if err != nil {
        return nil, err
    }
    r, err := newReader(realByteSlice(f.Bytes()), f)
    if err != nil {
        var merr tsdb_errors.MultiError
        merr.Add(err)
        merr.Add(f.Close())
        return nil, merr
    }

    return r, nil
}
func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
    r := &Reader{
        b:        b,
        c:        c,
        labels:   map[string]uint64{},
        postings: map[string]map[string]uint64{},
    }

    // Verify header.
    if r.b.Len() < HeaderLen {
        return nil, errors.Wrap(encoding.ErrInvalidSize, "index header")
    }
    if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
        return nil, errors.Errorf("invalid magic number %x", m)
    }
    r.version = int(r.b.Range(4, 5)[0])

    if r.version != FormatV1 && r.version != FormatV2 {
        return nil, errors.Errorf("unknown index file version %d", r.version)
    }

    // 读取TOC
    toc, err := NewTOCFromByteSlice(b)
    if err != nil {
        return nil, errors.Wrap(err, "read TOC")
    }

    // 将所有的symbol table读入内存,具体分析如下文解释
    r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols))
    if err != nil {
        return nil, errors.Wrap(err, "read symbols")
    }
    // Use the strings already allocated by symbols, rather than
    // re-allocating them again below.
    // Additionally, calculate symbolsTableSize.
    allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2))
    for _, s := range r.symbolsV1 {
        r.symbolsTableSize += uint64(len(s) + 8)
        allocatedSymbols[s] = s
    }
    for _, s := range r.symbolsV2 {
        // TODO(zhengliang): why 8?
        r.symbolsTableSize += uint64(len(s) + 8)
        allocatedSymbols[s] = s
    }

    // 读取Label index table
    if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error {
        if len(key) != 1 {
            return errors.Errorf("unexpected key length for label indices table %d", len(key))
        }

        r.labels[allocatedSymbols[key[0]]] = off
        return nil
    }); err != nil {
        return nil, errors.Wrap(err, "read label index table")
    }

    r.postings[""] = map[string]uint64{}
    if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error {
        if len(key) != 2 {
            return errors.Errorf("unexpected key length for posting table %d", len(key))
        }
        if _, ok := r.postings[key[0]]; !ok {
            r.postings[allocatedSymbols[key[0]]] = map[string]uint64{}
        }
        r.postings[key[0]][allocatedSymbols[key[1]]] = off
        return nil
    }); err != nil {
        return nil, errors.Wrap(err, "read postings table")
    }

    r.dec = &Decoder{LookupSymbol: r.lookupSymbol}

    return r, nil
}

index结构

index的header为5个字节大小,其中4字节为magic number,1字节为version。Reader在计算总大小时会将header所占的字节数去掉。

index的数据格式如下所示:

+----------------------------+---------------------+
| magic(0xBAAAD700) <4b>     | version(1) <1 byte> |
+----------------------------+---------------------+
| +----------------------------------------------+ |
| |                 Symbol Table                 | |
| +----------------------------------------------+ |
| |                    Series                    | |
| +----------------------------------------------+ |
| |                 Label Index 1                | |
| +----------------------------------------------+ |
| |                      ...                     | |
| +----------------------------------------------+ |
| |                 Label Index N                | |
| +----------------------------------------------+ |
| |                   Postings 1                 | |
| +----------------------------------------------+ |
| |                      ...                     | |
| +----------------------------------------------+ |
| |                   Postings N                 | |
| +----------------------------------------------+ |
| |               Label Index Table              | |
| +----------------------------------------------+ |
| |                 Postings Table               | |
| +----------------------------------------------+ |
| |                      TOC                     | |
| +----------------------------------------------+ |
+--------------------------------------------------+ 

读取symbol table

symbol table的具体格式如下:

toc.Symbols
 |
\|/
 +--------------------+---------------------+
 | len <4b>           |  len(symbols) <4b>  |
 +--------------------+---------------------+
 | +----------------------+---------------+ |
 | | len(str_1) <uvarint> | str_1 <bytes> | |
 | +----------------------+---------------+ |
 | |                . . .                 | |
 | +----------------------+---------------+ |
 | | len(str_n) <uvarint> | str_n <bytes> | |
 | +----------------------+---------------+ |
 +--------------------+---------------------+
 | CRC32 <4b>                               |
 +--------------------+---------------------+
dlv) p toc
*github.com/prometheus/prometheus/tsdb/index.TOC {Symbols: 5, Series: 2728, LabelIndices: 10123, LabelIndicesTable: 17244, Postings: 10864, PostingsTable: 17393}

(dlv) p r.symbolsV2
[]string len: 143, cap: 143, [
    "",
    "/",
    "/Users/xsky/Downloads/software/ShadowsocksX-NG 2.app",
    "/dev/disk1s1",
    "/dev/disk1s4",
    "/home",
    "/net",
    "/private/var/folders/pb/rc1txhc12_x0jt1tmld1bn300000gn/T/AppTran...+46 more",
    "/private/var/vm",
    "0",
    "0.25",
    "0.5",
    "0.75",
    "1",
    "2",
    "200",
    "3",
    "500",
    "503",
    "XHC0",
    "XHC20",
    "__name__",
    ......
// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed.
func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) {
    // 此时的bs还是包含header,即包含magic number和version
    // offset即为TOC中symbol table所在的offset, 即从
    if off == 0 {
        return nil, nil, nil
    }
    // 返回值Decbuf不包含头部的4字节长度,以及尾部4字节的
    // CRC,只包含中间symbols部分
    d := encoding.NewDecbufAt(bs, off, castagnoliTable)

    var (
        // Decbuf的长度,即symbol table的中间部分
        origLen     = d.Len()
        // 读取symbols的个数,注意这个地方消耗了4个字节
        cnt         = d.Be32int()
        // 如果off等于5的话,basePos等于9, nextPos等于13
        // 即nextPos指向symbols
        basePos     = uint32(off) + 4
        nextPos     = basePos + uint32(origLen-d.Len())
        symbolSlice []string
        symbols     = map[uint32]string{}
    )
    if version == FormatV2 {
        symbolSlice = make([]string, 0, cnt)
    }

    for d.Err() == nil && d.Len() > 0 && cnt > 0 {
        s := d.UvarintStr()

        if version == FormatV2 {
            symbolSlice = append(symbolSlice, s)
        } else {
            symbols[nextPos] = s
            nextPos = basePos + uint32(origLen-d.Len())
        }
        cnt--
    }
    return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols")
}

读取Label Index Table

// ReadOffsetTable reads an offset table and at the given position calls f for each
// found entry. If f returns an error it stops decoding and returns the received error.
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error {
    // 返回值Decbuf不包含头部的4字节长度,以及尾部4字节的
    // CRC,只包含中间index部分
    d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
    // 读取index个数,占用4个字节
    cnt := d.Be32()

    // The Postings offset table takes only 2 keys per entry (name and value of label),
    // and the LabelIndices offset table takes only 1 key per entry (a label name).
    // Hence setting the size to max of both, i.e. 2.
    keys := make([]string, 0, 2)
    for d.Err() == nil && d.Len() > 0 && cnt > 0 {
        keyCount := d.Uvarint()
        keys = keys[:0]

        for i := 0; i < keyCount; i++ {
            keys = append(keys, d.UvarintStr())
        }
        o := d.Uvarint64()
        if d.Err() != nil {
            break
        }
        if err := f(keys, o); err != nil {
            return err
        }
        cnt--
    }
    return d.Err()
}

Label Index Table的大致结构如下:

+------------------------+-------------------------+
| len <4b>               | len(labelIndexes) <4b>  |
+------------------------+-------------------------+
|    +----------------------------------------+    |
|    |  len(keys) <uvarint>                   |    |
|    +----------------------+-----------------+    |
|    | len(name) <uvarint>  | name <bytes>    |    |
|    +----------------------+-----------------+    |
|    |                 . . .                  |    |
|    +----------------------------------------+    |
|    |  offset <uvarint64>                    |    |
|    +----------------------------------------+    |
|                       . . .                      |
+------------------------+-------------------------+
|      CRC32 <4b>                                  |
+------------------------+-------------------------+
(dlv) p r.labels
map[string]uint64 [
    "cpu": 10124,
    "mode": 10156,
    "instance": 10624,
    "job": 10644,
    "quantile": 10712,
    "collector": 10784,
    "device": 10188,
    "goversion": 10276,
    "__name__": 10296,
    "version": 10664,
    "fstype": 10684,
    "mountpoint": 10748,
    "code": 10836,
]

(dlv) p allocatedSymbols
map[string]string [
    "go_goroutines": "go_goroutines",
    "go_memstats_stack_sys_bytes": "go_memstats_stack_sys_bytes",
    "node_scrape_collector_success": "node_scrape_collector_success",
    "go_memstats_mcache_inuse_bytes": "go_memstats_mcache_inuse_bytes",
    "go_memstats_mcache_sys_bytes": "go_memstats_mcache_sys_bytes",
    "system": "system",
    ...+129 more
]

读取Posting Table

读取Posting Table的过程与读取Label Index Table的过程大致类似。

Postings Table的大致结构如下:

+------------------------+-------------------------+
| len <4b>               | len(postings) <4b>      |
+------------------------+-------------------------+
|    +----------------------------------------+    |
|    |  n = 2 <1b>  <uvarint>                 |    |
|    +----------------------+-----------------+    |
|    | len(name) <uvarint>  | name <bytes>    |    |
|    +----------------------+-----------------+    |
|    | len(value) <uvarint> | value <bytes>   |    |
|    +----------------------------------------+    |
|    |  offset <uvarint64>                    |    |
|    +----------------------------------------+    |
|                       . . .                      |
+------------------------+-------------------------+
|      CRC32 <4b>                                  |
+------------------------+-------------------------+
(dlv) p r.postings
map[string]map[string]uint64 [
    "fstype": [
        "apfs": 14772,
        "autofs": 14840,
        "nullfs": 14908,
    ],
    "goversion": [
        "go1.12.4": 14948,
    ],
    "__name__": [
        "go_memstats_alloc_bytes": 11896,
        "go_memstats_heap_sys_bytes": 12072,
        "node_boot_time_seconds": 12296,
        "node_disk_read_sectors_total": 12404,
        "node_disk_reads_completed_total": 12436,
        "node_network_receive_errs_total": 12992,
        "go_memstats_sys_bytes": 12264,
        "node_disk_writes_completed_total": 12468,
        "node_network_receive_multicast_total": 13052,
        "scrape_samples_scraped": 13612,
        "up": 13644,
        "go_memstats_heap_alloc_bytes": 11992,
        "go_memstats_heap_objects": 12040,
        "go_memstats_last_gc_time_seconds": 12088,
        "node_filesystem_files_free": 12628,
        "node_load1": 12756,
        "go_goroutines": 11864,
        "go_threads": 12280,
        "node_memory_free_bytes": 12836,
        "node_memory_inactive_bytes": 12852,
        "node_memory_swapped_out_bytes_total": 12884,
        "node_memory_wired_bytes": 12916,
        "node_network_transmit_bytes_total": 13172,
        "promhttp_metric_handler_requests_total": 13556,
        "go_memstats_alloc_bytes_total": 11912,
        "go_memstats_mcache_inuse_bytes": 12136,
        "node_filesystem_free_bytes": 12660,
        "node_filesystem_size_bytes": 12724,
        "node_load5": 12788,
        "node_memory_active_bytes": 12804,
        "node_network_transmit_errs_total": 13232,
        "scrape_duration_seconds": 13580,
        "go_memstats_buck_hash_sys_bytes": 11928,
        "node_disk_write_time_seconds_total": 12452,
        "node_scrape_collector_duration_seconds": 13412,
        "go_gc_duration_seconds": 11800,
        "go_memstats_lookups_total": 12104,
        "node_memory_compressed_bytes": 12820,
        "go_gc_duration_seconds_count": 11832,
        "go_memstats_mcache_sys_bytes": 12152,
        "go_memstats_stack_sys_bytes": 12248,
        "node_cpu_seconds_total": 12312,
        "node_disk_read_time_seconds_total": 12420,
        "scrape_samples_post_metric_relabeling": 13596,
        "go_gc_duration_seconds_sum": 11848,
        "go_memstats_gc_cpu_fraction": 11960,
        "go_memstats_next_gc_bytes": 12200,
        "node_disk_read_bytes_total": 12388,
        "node_disk_written_bytes_total": 12484,
        "node_disk_written_sectors_total": 12500,
        "node_filesystem_readonly": 12692,
        "node_memory_swapped_in_bytes_total": 12868,
        "node_network_transmit_multicast_total": 13292,
        "node_scrape_collector_success": 13460,
        "go_memstats_heap_released_bytes": 12056,
        "go_memstats_mspan_sys_bytes": 12184,
        "node_filesystem_files": 12596,
        "go_memstats_mallocs_total": 12120,
        "node_textfile_scrape_error": 13508,
        "scrape_series_added": 13628,
        "go_memstats_frees_total": 11944,
        "go_memstats_heap_idle_bytes": 12008,
        "go_memstats_mspan_inuse_bytes": 12168,
        "node_network_receive_packets_total": 13112,
        ...+14 more
    ],
    "collector": [
        "boottime": 13708,
        "cpu": 13728,
        "time": 13868,
        "diskstats": 13748,
        "filesystem": 13768,
        "loadavg": 13788,
        "meminfo": 13808,
        "netdev": 13828,
        "textfile": 13848,
    ],
    "cpu": [
        "0": 13888,
        "1": 13916,
        "2": 13944,
        "3": 13972,
    ],
    "instance": [
        "localhost:9100": 14964,
    ],
    "mountpoint": [
        "/": 16948,
        "/home": 16988,
        "/net": 17028,
        "/private/var/folders/pb/rc1txhc12_x0jt1tmld1bn300000gn/T/AppTran...+46 more": 17068,
        "/private/var/vm": 17108,
    ],
    "quantile": [
        "0": 17148,
        "0.25": 17164,
        "0.5": 17180,
        "0.75": 17196,
        "1": 17212,
    ],
    "": [
        "": 10864,
    ],
    "code": [
        "200": 13660,
        "500": 13676,
        "503": 13692,
    ],
    "device": [
        "XHC0": 14120,
        "bridge0": 14252,
        "en2": 14428,
        "lo0": 14516,
        "map -hosts": 14560,
        "map auto_home": 14600,
        "p2p0": 14640,
        "/dev/disk1s1": 14040,
        "/dev/disk1s4": 14080,
        "disk0": 14296,
        "en0": 14340,
        "en1": 14384,
        "gif0": 14472,
        "XHC20": 14164,
        "awdl0": 14208,
        "/Users/xsky/Downloads/software/ShadowsocksX-NG 2.app": 14000,
        "stf0": 14684,
        "utun0": 14728,
    ],
    "job": [
        "node": 15900,
    ],
    "mode": [
        "idle": 16836,
        "nice": 16864,
        "system": 16892,
        "user": 16920,
    ],
    "version": [
        "go1.12.4": 17228,
    ],
]

tombstones读取

+------------------------------+------------------------------+
| magic(0x0130BA30) <4b>       | version(1) <1 byte>          |
+------------------------------+------------------------------+
|  +------------------+-----------------+-----------------+   | 
|  |  ref <uvarint64> | mint <varint64> | maxt <varint64> |   |
|  +------------------+-----------------+-----------------+   |
|  |                      ........                        |   |
|  +------------------+-----------------+-----------------+   |
|  |                       CRC                            |   |
|  +------------------------------------------------------+   |
+-------------------------------------------------------------+

Compaction

The storage has to periodically “cut” a new block and write the previous one, which is now completed, onto disk. Only after the block was successfully persisted, the write ahead log files, which are used to restore in-memory blocks, are deleted.
We are interested in keeping each block reasonably short (about two hours for a typical setup) to avoid accumulating too much data in memory. When querying multiple blocks, we have to merge their results into an overall result. This merge procedure obviously comes with a cost and a week-long query should not have to merge 80+ partial results.
To achieve both, we introduce compaction. Compaction describes the process of taking one or more blocks of data and writing them into a, potentially larger, block. It can also modify existing data along the way, e.g. dropping deleted data, or restructuring our sample chunks for improved query performance.

Retention, MinBlockDuration, MaxBlockDuration

                     |
 +-----------+  +-----------+  +-----------+  +-----------+  +-----------+
 | 1         |  | 2  |      |  | 3         |  | 4         |  | 5         |   .....
 +-----------+  +-----------+  +-----------+  +-----------+  +-----------+
                     |
              retention boundary

随着数据的增长,block因为压缩变得越来越大。而block也应该有上限,其上限为其不应该跨整个数据库。经验值MaxBlockDuration取的是不超过Retention window10%。而MinBlockDuration默认取的是2h

The older data gets, the larger the blocks may become as we keep compacting previously compacted blocks. An upper limit has to be applied so blocks don’t grow to span the entire database and thus diminish the original benefits of our design.
Conveniently, this also limits the total disk overhead of blocks that are partially inside and partially outside of the retention window, i.e. block 2 in the example above. When setting the maximum block size at 10% of the total retention window, our total overhead of keeping block 2 around is also bound by 10%.


References

上一篇下一篇

猜你喜欢

热点阅读