Prometheus数据抓取及数据存储实现

2020-04-09  本文已影响0人  天生小包

Goroutine生命周期

Prometheus使用一种通用的Goroutine生命周期的管理机制oklog的run.Group。

// Croup 收集 actors 函数然后并行运行它们;当一个 actor(函数)返回,所有 actors 被 interrupted
type Group struct {
  actors []actor
}

// 将一个 actor(函数)添加到group,每一个 actor 必须预占一个中断函数,如果调用了中断函数,execute必须return
// 此外,即使在执行 return 后调用 interrupt 也必须是安全的
// 第一个 return 的 actor(函数)中断所有正在运行的 actors; error 传递给 interrupt 函数并有 Run 返回
func (g *Group) Add(execute func() error, interrupt func(error)) {
  g.actors = append(g.actors, actor{execute, interrupt})
}
// Run 并行运行所有 actors; 当第一个 actor return,所有其他的 actors被中断
// 只有当所有的 actors(函数)都退出 Run 才能 return; Run 返回的 error 是第一个 actor 返回的 error
func (g *Group) Run() error {
  if len(g.actors) == 0 {
    return nil
  }

  // Run each actor.
  errors := make(chan error, len(g.actors))
  for _, a := range g.actors {
    go func(a actor) {
      errors <- a.execute()
    }(a)
  }

  // Wait for the first actor to stop.
  err := <-errors

  // Signal all actors to stop.
  for _, a := range g.actors {
    a.interrupt(err)
  }

  // Wait for all actors to stop.
  for i := 1; i < cap(errors); i++ {
    <-errors
  }

  // Return the original error.
  return err
}

type actor struct {
  execute   func() error
  interrupt func(error)
}

## example in prometheus main.go 478~725 (Fragment)
var g run.Group
{
    // Scrape discovery manager.
    g.Add(
      func() error {
        err := discoveryManagerScrape.Run()
        level.Info(logger).Log("msg", "Scrape discovery manager stopped")
        return err
      },
      func(err error) {
        level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
        cancelScrape()
      },
    )
  }
{
    // Scrape manager.
    g.Add(
      func() error {
        // When the scrape manager receives a new targets list
        // it needs to read a valid config for each job.
        // It depends on the config being in sync with the discovery manager so
        // we wait until the config is fully loaded.
        <-reloadReady.C

        err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
        level.Info(logger).Log("msg", "Scrape manager stopped")
        return err
      },
      func(err error) {
        // Scrape manager needs to be stopped before closing the local TSDB
        // so that it doesn't try to write samples to a closed storage.
        level.Info(logger).Log("msg", "Stopping scrape manager...")
        scrapeManager.Stop()
      },
    )
}
...
if err := g.Run(); err != nil {
    level.Error(logger).Log("err", err)
    os.Exit(1)
}

服务发现机制

Prometheus作为云时代最受欢迎的监控系统之一,它的服务发现机制在其中发挥着重要作用。它能够支持包括 Kubernetes、Azure、EC2、GCE、OpenStack、Consul、Marathon、Zookeeper及静态文件类型等十几种服务发现的提供者。通过Discovery Manager管理所有provider,定时(默认5秒)更新服务发现的Targets,根据各个Targets的Scrape配置文件Pull监控数据,转换成保存于本地TSDB时序数据库。

源码片段

## discovery/manager.go
// Discover 提供关于抓取目标组的信息,它维护一组TargetGroups源,只要发现provider检测到潜在的更改,它就会通过channel发送TargetGroup
type Discoverer interface {
  // Run hands a channel to the discovery provider (Consul, DNS etc) through which it can send updated target groups.
  // Must returns if the context gets canceled. It should not close the update channel on returning.
  Run(ctx context.Context, up chan<- []*targetgroup.Group)
}

// provider holds a Discoverer instance, its configuration and its subscribers.
type provider struct {
  name   string
  d      Discoverer
  subs   []string
  config interface{}
}

// Manager维护一组服务发现提供者并且将每一个更新发送到map channel; Targets 通过target名称分组
type Manager struct {
  logger         log.Logger
  name           string
  mtx            sync.RWMutex
  ctx            context.Context
  discoverCancel []context.CancelFunc

  // Some Discoverers(eg. k8s) send only the updates for a given target group
  // so we use map[tg.Source]*targetgroup.Group to know which group to update.
  targets map[poolKey]map[string]*targetgroup.Group
  // providers keeps track of SD providers.
  providers []*provider
  // The sync channel sends the updates as a map where the key is the job value from the scrape config.
  syncCh chan map[string][]*targetgroup.Group

  // How long to wait before sending updates to the channel. The variable
  // should only be modified in unit tests.
  updatert time.Duration

  // The triggerSend channel signals to the manager that new updates have been received from providers.
  triggerSend chan struct{}
}

Scrape流程

1、服务发现active targets即Prometheus Pull监控指标的目标endpoint。
2、ScrapeManager以map形式维护各个Targets的抓取配置、抓取池和Target Group,其key是target labels的hash值。
3、它的Run方法间隔5秒钟触发reloader,检测当前scrapePools中的targets group,如果scrape configs中也存在则初始化sp,并行同步更新targets group中所有targets。
4、计算target的key(t.hash()),如果当前activeTargets中不存在该key,则在activeTargets及loops中添加该target,并新启一个goroutine后台执行loop.run()方法;如果这个key对应的target已经存在于activeTargets则更新target的discoveredLabels。

image

5.遍历更新完当前targetsgroup中所有targets,从当前activeTargets中移除旧的targets并停止scraper的loops

image

scraper loop :是一个interface,其中有两个方法run和stop; run执行抓取任务将抓取的时序数据append到TSDB, stop则停止抓取任务

image

scrapeLoop 实现 loop 接口,核心代码块摘要

scraper run方法,执行http/https Get请求到targets的metricPath拉取(scrape)监控数据缓存到buf,然后将时序数据append到TSDB,存储的时间戳timestamp是抓取监控指标的时间(正常情况下严格递增)

image

scrape stop 方法 停止scrape

image

TSDB 存储 sample

Prometheus TSDB参考 Facebook Gorilla [2] 时序数据库原理实现,其核心主要是对timestamp和时序数据value值的高压缩。在时序的场景中,将每个时序对可以视为一对64的值,以时间轴为横坐标表示该点的timestamp,时序值value为该点的纵坐标,每一个时序对按时间轴递增而时序值根据采集到的实际值变动。

type sample struct {
  metric  labels.Labels
  t       int64
  v       float64
}

TSDB如何保证时序递增

tsdb基于delta-of-delta编码的方式压缩timestamp,基于XOR方式压缩时序点value的浮点值,这种压缩方式可以将16byte的时序点压缩到1.37byte,减少10x存储空间,也减少73x查询延迟同时提高14x查询吞吐。

delta-of-delta 使用变长编码算法(简称 dod)
1.块头存储完整的起始时间戳t-1,它与两小时(默认块大小)窗口对齐;块中的第一个时间戳t0被存储为14比特的t-1的增量(delta = t0 - th)
2.(a) 计算delta-of-delta  D = (tn - tn-1) - (tn-1 - tn-2)
(b) 根据D的取值范围分配不同位存储不同值
  D = 0                 1bit         '0'
  D ∈ [-8191,8192]      14bits       '10'
  D ∈ [-65535,65536]    17bits       '110'
  D ∈ [-524287,524288]  20bits       '1110'
  D ∈ 其他值             64bits       '1111'

tDelta = uint64(t - a.t), golang uint64类型取值非负保证了时序的递增性,不会出现乱序
image
XOR方法压缩浮点值value
1.直接存储时序中第一个时序点value值,不做压缩
2.从第二个时序点开始,将value值与前一时序点value值进行XOR运算
3.如果XOR运算结果为0,则表示这两个时序点的Value值相同,只需要使用一个bit位存储'0'值即可
4.如果XOR运算结果不是0,则需要使用到2个bit值的控制位,并首先将第一个bit存储为'1',接下来看控制位的第二个bit值
  (a)控制位第二个bit位为'0'时,表示此次计算得到的XOR结果中间非零的部分被前一个XOR运算结果包含.例如,与前一个XOR运算结果相比,此次XOR运算结果也有同样多的前置0和同样多的尾部0,那么我们只需要存储XOR结果中非0的部分即可
  (b)控制位第二个bit位为'1'时,需要用5个bit位来存储XOR结果中前置0的数量,然后用6个bit位来存储XOR结果中非0位的长度,最后再存储XOR的值

image

小结

prometheus核心是一个pull-based system,它的整个生态系统设计也是基于这种方式。这个方式的优缺点在文中也圈点出来

于是在构思一种让Prometheus接收数据的方式,timestamp使用指标采集时间,不经过prometheus服务发现模块,数据预处理后直接写入TSDB。接收到的数据首先经过预处理(监控项、标签符合prometheus规范,时间必须在有效范围...); 对于push上来的监控项,由于其游离于discovery之外,如果想对某些监控项进行聚合计算、告警规则等配置的增删改就需要额外的策略来实现动态可配置。

上一篇下一篇

猜你喜欢

热点阅读