Prometheus数据存储

2019-10-22  本文已影响0人  酱油王0901

To be continued....

Prometheus提供了两种存储方式,分别为本地存储和远端存储。

Prometheus的数据写入接口为Appender,源文件tsdb/db.go

type Appender interface {
    // Add adds a sample pair for the given series. A reference number is
    // returned which can be used to add further samples in the same or later
    // transactions.
    // Returned reference numbers are ephemeral and may be rejected in calls
    // to AddFast() at any point. Adding the sample via Add() returns a new
    // reference number.
    // If the reference is 0 it must not be used for caching.
    Add(l labels.Labels, t int64, v float64) (uint64, error)

    // AddFast adds a sample pair for the referenced series. It is generally
    // faster than adding a sample by providing its full label set.
    AddFast(ref uint64, t int64, v float64) error

    // Commit submits the collected samples and purges the batch.
    Commit() error

    // Rollback rolls back all modifications made in the appender so far.
    Rollback() error
}

初始化

Prometheus支持本地存储和远端存储,初始化过程如下:

var (
     localStorage  = &tsdb.ReadyStorage{}
     remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.    StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))
     fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

首先分别创建localremote的storage instance;然后根据instances创建Fanout storage。其中需要注意的是local storage还没有与actual storage关联,后期TSDB加载完成后将其关联在TSDB上,此过程通过ReadyStorage.Set方法来完成。

db, err := tsdb.Open(
                cfg.localStoragePath,
                log.With(logger, "component", "tsdb"),
                prometheus.DefaultRegisterer,
                &cfg.tsdb,
            )
......
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
                localStorage.Set(db, startTimeMargin)

为了兼容本地存储和远端存储,Prometheus提供了fanout类,fanout实现了Storage接口,fanout的结构如下:

type fanout struct {
    logger log.Logger

    primary     Storage
    secondaries []Storage
}

当执行fanout中的方法(例如Add)时,fanout会先执行本地存储(Primary)的Add方法,然后遍历执行每个远端存储(secondaries)的Add的方法。

其中Storage接口定义在源文件storage/interface.go

// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
    Queryable

    // StartTime returns the oldest timestamp stored in the storage.
    StartTime() (int64, error)

    // Appender returns a new appender against the storage.
    Appender() (Appender, error)

    // Close closes the storage and all its underlying resources.
    Close() error
}

Appendable接口定义了数据能被append到哪个entity,具体定义在源文件tsdb/block.go中。

// Appendable defines an entity to which data can be appended.
type Appendable interface {
    // Appender returns a new Appender against an underlying store.
    Appender() Appender
}

数据的执行流程如下所示:


其中ReadyStorage, fanout, DB, Head分别都实现了Appendable接口。数据首先写到head block

数据抓取

数据抓取的过程是scrapeLoop中的scraper实例周期性地执行抓取操作,将抓取的数据先保存在buf中,具体实现在源文件scrape/scrape.go中。

func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
    ......
   
    b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
    buf := bytes.NewBuffer(b)

    // 抓取数据
    contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
    
    ......
    
    // A failed scrape is the same as an empty scrape,
    // we still call sl.append to trigger stale markers.
    // 调用scrapeLoop的append方法,将抓取的数据进行解析并存储,具体请见下面分析
    // b为buf的字符串表述,start为当前时间
    total, added, seriesAdded, appErr := sl.append(b, contentType, start)
    if appErr != nil {
        level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
        // The append failed, probably due to a parse error or sample limit.
        // Call sl.append again with an empty scrape to trigger stale markers.
        if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
            level.Warn(sl.l).Log("msg", "append failed", "err", err)
        }
    }

    sl.buffers.Put(b)

// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
    scrape(ctx context.Context, w io.Writer) (string, error)
    report(start time.Time, dur time.Duration, err error)
    offset(interval time.Duration, jitterSeed uint64) time.Duration
}

抓取的数据格式为:

# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 2.3944e-05
go_gc_duration_seconds{quantile="0.25"} 4.9177e-05
go_gc_duration_seconds{quantile="0.5"} 0.000121297
go_gc_duration_seconds{quantile="0.75"} 0.000319643
go_gc_duration_seconds{quantile="1"} 0.027424647
go_gc_duration_seconds_sum 2029.637676688
go_gc_duration_seconds_count 17095
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 98

scrapeLoop

scrapeLoop的数据结构如下:

type scrapeLoop struct {
    scraper         scraper
    cache           *scrapeCache
    lastScrapeSize  int
    buffers         *pool.Pool

    appender            func() storage.Appender
    .....
}

scrapeLoopscrapePool构造,其中:

scrapePool的数据结构如下:

type scrapePool struct {
    appendable Appendable
    logger     log.Logger

    mtx    sync.RWMutex
    config *config.ScrapeConfig
    client *http.Client
    // Targets and loops must always be synchronized to have the same
    // set of hashes.
    activeTargets  map[uint64]*Target
    droppedTargets []*Target
    loops          map[uint64]loop
    cancel         context.CancelFunc

    // Constructor for new scrape loops. This is settable for testing convenience.
    newLoop func(scrapeLoopOptions) loop
}

scrapePool的作用是管理targets的scrape。其中:

所有的scrapePool的信息都是由scrapeManager来维护的,而scrapeManager的初始化是在prometheus启动时传入相应的storage来完成的。

scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)

scrapeManager的结构体定义在scrape/manager.go文件中。

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
    logger    log.Logger
    append    Appendable
    graceShut chan struct{}

    jitterSeed    uint64     // Global jitterSeed seed is used to spread scrape workload across HA setup.
    mtxScrape     sync.Mutex // Guards the fields below.
    scrapeConfigs map[string]*config.ScrapeConfig
    scrapePools   map[string]*scrapePool
    targetSets    map[string][]*targetgroup.Group

    triggerReload chan struct{}
}

scrapeManagerRun方法接受并保存从discovery manager scrape发送过来的有变化的target set,并更新其target信息;然后调用reload方法创建新的scrapePool或者

func (m *Manager) reload() {
    m.mtxScrape.Lock()
    var wg sync.WaitGroup
    for setName, groups := range m.targetSets {
        if _, ok := m.scrapePools[setName]; !ok {
            scrapeConfig, ok := m.scrapeConfigs[setName]
            if !ok {
                level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
                continue
            }
            // 创建新的scrapePool,其中append即为fanoutStorage
            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
            if err != nil {
                level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
                continue
            }
            m.scrapePools[setName] = sp
        }

        wg.Add(1)
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {
            // Sync将target group转化为实际的scrape target
            sp.Sync(groups)
            wg.Done()
        }(m.scrapePools[setName], groups)

    }
    m.mtxScrape.Unlock()
    wg.Wait()
}

数据存储过程

下面接着数据抓取部分的scrapeLoopappend方法进行讲解。


headAppender

由于数据先写入head block,因此有必要详细介绍head相关的源码。

type headAppender struct {
    head         *Head
    minValidTime int64 // No samples below this timestamp are allowed.
    mint, maxt   int64

    series  []RefSeries
    samples []RefSample
}

headAppender实现了tsdb的Appender接口。
其中headAppenderCommit方法实际上是调用其log方法将seriessamples分别写入WAL中。

func (a *headAppender) log() error {
    if a.head.wal == nil {
        return nil
    }

    buf := a.head.getBytesBuffer()
    defer func() { a.head.putBytesBuffer(buf) }()

    var rec []byte
    var enc RecordEncoder

    if len(a.series) > 0 {
        rec = enc.Series(a.series, buf)
        buf = rec[:0]

        if err := a.head.wal.Log(rec); err != nil {
            return errors.Wrap(err, "log series")
        }
    }
    if len(a.samples) > 0 {
        rec = enc.Samples(a.samples, buf)
        buf = rec[:0]

        if err := a.head.wal.Log(rec); err != nil {
            return errors.Wrap(err, "log samples")
        }
    }
    return nil
}

前面的文章Prometheus checkpoint中已经介绍过RecordDecoder相关的源码,现在详细介绍其encode过程,也就是如何将seriessamples encode成records

// RecordEncoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type RecordEncoder struct {
}

const (
    // RecordInvalid is returned for unrecognised WAL record types.
    RecordInvalid RecordType = 255
    // RecordSeries is used to match WAL records of type Series.
    RecordSeries RecordType = 1
    // RecordSamples is used to match WAL records of type Samples.
    RecordSamples RecordType = 2
    // RecordTombstones is used to match WAL records of type Tombstones.
    RecordTombstones RecordType = 3
)

下面来看一下RecordEncoder是如何将series encode成字节数组,也就是record的。

// Series appends the encoded series to b and returns the resulting slice.
func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte {
    buf := encoding.Encbuf{B: b}
    buf.PutByte(byte(RecordSeries))

    for _, s := range series {
        buf.PutBE64(s.Ref)
        buf.PutUvarint(len(s.Labels))

        for _, l := range s.Labels {
            buf.PutUvarintStr(l.Name)
            buf.PutUvarintStr(l.Value)
        }
    }
    return buf.Get()
}

series record格式为:

+--------------------------------------+-------------------------------------+
|       Name                           |      Value                          |
+---------------------+----------------+----------------------+--------------+
|len(str_n) <uvarint> | str_n <bytes>  | len(str_n) <uvarint> | str_n <bytes>|
+---------------------+----------------+----------------------+--------------+

+-------------------------------------------------+
| type <1b>                                       |
+-------------------------------------------------+
| +---------------------------------------------+ |
| | ref <8b>                                    | |
| +---------------------------------------------+ |
| | len(lables) <uvarint>                       | |
| +----------------------+----------------------+ |
| |    +---------------+-------------------+    | |
| |    |     Name      |       Value       |    | |
| |    +---------------+-------------------+    | |
| |    |            . . .                  |    | |
| |    +---------------+-------------------+    | |
| +----------------------+----------------------+ |
|                   . . .                         |  
+-------------------------------------------------+

Samples方法将samples encode成sampels record并返回此slice。

// Samples appends the encoded samples to b and returns the resulting slice.
func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte {
    buf := encoding.Encbuf{B: b}
    buf.PutByte(byte(RecordSamples))

    if len(samples) == 0 {
        return buf.Get()
    }
    
    // Store base timestamp and base reference number of first sample.
    // All samples encode their timestamp and ref as delta to those.
    first := samples[0]

    buf.PutBE64(first.Ref)
    buf.PutBE64int64(first.T)

    for _, s := range samples {
        buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
        buf.PutVarint64(s.T - first.T)
        buf.PutBE64(math.Float64bits(s.V))
    }
    return buf.Get()
}

samples record格式为:

+-------------------------------------------------+
| type <1b>                                       |
+-------------------------------------------------+
| first sample reference number  <8b>             |
+-------------------------------------------------+
| first sample timestamp  <8b>                    |
+-------------------------------------------------+
| +---------------------------------------------+ |
| | delta ref <uvarint>                         | |
| +---------------------------------------------+ |
| | delta timestamp <uvarint>                   | |
| +----------------------+----------------------+ |
| | value <8b>                                  | |
| +----------------------+----------------------+ |
|                   . . .                         |  
+-------------------------------------------------+

Tombstones方法将stones encode成stone record并返回此slice。

// Interval represents a single time-interval.
type Interval struct {
    Mint, Maxt int64
}

// Tombstones appends the encoded tombstones to b and returns the resulting slice.
func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte {
    buf := encoding.Encbuf{B: b}
    buf.PutByte(byte(RecordTombstones))

    for _, s := range tstones {
        for _, iv := range s.intervals {
            buf.PutBE64(s.ref)
            buf.PutVarint64(iv.Mint)
            buf.PutVarint64(iv.Maxt)
        }
    }
    return buf.Get()
}

tombstone record格式为:

+-------------------------------------------------+
| type <1b>                                       |
+-------------------------------------------------+
| +---------------------------------------------+ |
| |    +-----------------------------------+    | |
| |    |    stone ref <8b>                 |    | |
| |    +-----------------------------------+    | |
| |    |    interval Mint <uvarint>        |    | |
| |    +-----------------------------------+    | |
| |    |    interval Maxt <uvarint>        |    | |
| |    +-----------------------------------+    | |
| |                 . . .                       | |
| +----------------------+----------------------+ |
|                   . . .                         |  
+-------------------------------------------------+

RecordEncodeseriessamples encode成series recordsample record之后,会分别调用WAL的Log方法,将其写入WAL中,具体可见文档Prometheus WAL源码阅读部分。


问题点


References

上一篇 下一篇

猜你喜欢

热点阅读