Prometheus数据抓取及数据存储实现
Goroutine生命周期
Prometheus使用一种通用的Goroutine生命周期的管理机制oklog的run.Group。
// Croup 收集 actors 函数然后并行运行它们;当一个 actor(函数)返回,所有 actors 被 interrupted
type Group struct {
actors []actor
}
// 将一个 actor(函数)添加到group,每一个 actor 必须预占一个中断函数,如果调用了中断函数,execute必须return
// 此外,即使在执行 return 后调用 interrupt 也必须是安全的
// 第一个 return 的 actor(函数)中断所有正在运行的 actors; error 传递给 interrupt 函数并有 Run 返回
func (g *Group) Add(execute func() error, interrupt func(error)) {
g.actors = append(g.actors, actor{execute, interrupt})
}
// Run 并行运行所有 actors; 当第一个 actor return,所有其他的 actors被中断
// 只有当所有的 actors(函数)都退出 Run 才能 return; Run 返回的 error 是第一个 actor 返回的 error
func (g *Group) Run() error {
if len(g.actors) == 0 {
return nil
}
// Run each actor.
errors := make(chan error, len(g.actors))
for _, a := range g.actors {
go func(a actor) {
errors <- a.execute()
}(a)
}
// Wait for the first actor to stop.
err := <-errors
// Signal all actors to stop.
for _, a := range g.actors {
a.interrupt(err)
}
// Wait for all actors to stop.
for i := 1; i < cap(errors); i++ {
<-errors
}
// Return the original error.
return err
}
type actor struct {
execute func() error
interrupt func(error)
}
## example in prometheus main.go 478~725 (Fragment)
var g run.Group
{
// Scrape discovery manager.
g.Add(
func() error {
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancelScrape()
},
)
}
{
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}
...
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
服务发现机制
Prometheus作为云时代最受欢迎的监控系统之一,它的服务发现机制在其中发挥着重要作用。它能够支持包括 Kubernetes、Azure、EC2、GCE、OpenStack、Consul、Marathon、Zookeeper及静态文件类型等十几种服务发现的提供者。通过Discovery Manager管理所有provider,定时(默认5秒)更新服务发现的Targets,根据各个Targets的Scrape配置文件Pull监控数据,转换成保存于本地TSDB时序数据库。
源码片段
## discovery/manager.go
// Discover 提供关于抓取目标组的信息,它维护一组TargetGroups源,只要发现provider检测到潜在的更改,它就会通过channel发送TargetGroup
type Discoverer interface {
// Run hands a channel to the discovery provider (Consul, DNS etc) through which it can send updated target groups.
// Must returns if the context gets canceled. It should not close the update channel on returning.
Run(ctx context.Context, up chan<- []*targetgroup.Group)
}
// provider holds a Discoverer instance, its configuration and its subscribers.
type provider struct {
name string
d Discoverer
subs []string
config interface{}
}
// Manager维护一组服务发现提供者并且将每一个更新发送到map channel; Targets 通过target名称分组
type Manager struct {
logger log.Logger
name string
mtx sync.RWMutex
ctx context.Context
discoverCancel []context.CancelFunc
// Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
targets map[poolKey]map[string]*targetgroup.Group
// providers keeps track of SD providers.
providers []*provider
// The sync channel sends the updates as a map where the key is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group
// How long to wait before sending updates to the channel. The variable
// should only be modified in unit tests.
updatert time.Duration
// The triggerSend channel signals to the manager that new updates have been received from providers.
triggerSend chan struct{}
}
Scrape流程
1、服务发现active targets即Prometheus Pull监控指标的目标endpoint。
2、ScrapeManager以map形式维护各个Targets的抓取配置、抓取池和Target Group,其key是target labels的hash值。
3、它的Run方法间隔5秒钟触发reloader,检测当前scrapePools中的targets group,如果scrape configs中也存在则初始化sp,并行同步更新targets group中所有targets。
4、计算target的key(t.hash()),如果当前activeTargets中不存在该key,则在activeTargets及loops中添加该target,并新启一个goroutine后台执行loop.run()方法;如果这个key对应的target已经存在于activeTargets则更新target的discoveredLabels。
image
5.遍历更新完当前targetsgroup中所有targets,从当前activeTargets中移除旧的targets并停止scraper的loops
image
scraper loop :是一个interface,其中有两个方法run和stop; run执行抓取任务将抓取的时序数据append到TSDB, stop则停止抓取任务
image
scrapeLoop 实现 loop 接口,核心代码块摘要
scraper run方法,执行http/https Get请求到targets的metricPath拉取(scrape)监控数据缓存到buf,然后将时序数据append到TSDB,存储的时间戳timestamp是抓取监控指标的时间(正常情况下严格递增)
image
scrape stop 方法 停止scrape
image
TSDB 存储 sample
Prometheus TSDB参考 Facebook Gorilla [2] 时序数据库原理实现,其核心主要是对timestamp和时序数据value值的高压缩。在时序的场景中,将每个时序对可以视为一对64的值,以时间轴为横坐标表示该点的timestamp,时序值value为该点的纵坐标,每一个时序对按时间轴递增而时序值根据采集到的实际值变动。
type sample struct {
metric labels.Labels
t int64
v float64
}
TSDB如何保证时序递增
tsdb基于delta-of-delta编码的方式压缩timestamp,基于XOR方式压缩时序点value的浮点值,这种压缩方式可以将16byte的时序点压缩到1.37byte,减少10x存储空间,也减少73x查询延迟同时提高14x查询吞吐。
delta-of-delta 使用变长编码算法(简称 dod)
1.块头存储完整的起始时间戳t-1,它与两小时(默认块大小)窗口对齐;块中的第一个时间戳t0被存储为14比特的t-1的增量(delta = t0 - th)
2.(a) 计算delta-of-delta D = (tn - tn-1) - (tn-1 - tn-2)
(b) 根据D的取值范围分配不同位存储不同值
D = 0 1bit '0'
D ∈ [-8191,8192] 14bits '10'
D ∈ [-65535,65536] 17bits '110'
D ∈ [-524287,524288] 20bits '1110'
D ∈ 其他值 64bits '1111'
tDelta = uint64(t - a.t), golang uint64类型取值非负保证了时序的递增性,不会出现乱序
image
XOR方法压缩浮点值value
1.直接存储时序中第一个时序点value值,不做压缩
2.从第二个时序点开始,将value值与前一时序点value值进行XOR运算
3.如果XOR运算结果为0,则表示这两个时序点的Value值相同,只需要使用一个bit位存储'0'值即可
4.如果XOR运算结果不是0,则需要使用到2个bit值的控制位,并首先将第一个bit存储为'1',接下来看控制位的第二个bit值
(a)控制位第二个bit位为'0'时,表示此次计算得到的XOR结果中间非零的部分被前一个XOR运算结果包含.例如,与前一个XOR运算结果相比,此次XOR运算结果也有同样多的前置0和同样多的尾部0,那么我们只需要存储XOR结果中非0的部分即可
(b)控制位第二个bit位为'1'时,需要用5个bit位来存储XOR结果中前置0的数量,然后用6个bit位来存储XOR结果中非0位的长度,最后再存储XOR的值
image
小结
prometheus核心是一个pull-based system,它的整个生态系统设计也是基于这种方式。这个方式的优缺点在文中也圈点出来
- 优点1 :Prometheus完善的服务发现机制兼容当前几乎所有平台,以kubernetes容器云监控为例,实现自动发现并监控资源对象生成告警策略,大大解放运维成本。
- 优点2: 时序数据以抓取时间作为timestamp存储,正常情况下可以保证timestamp严格递增(排除破坏性更改系统时间的行为),在某些场景下这也成为它的一个缺点。
- 缺点 :在实际生产中复杂的云环境下,若要考虑兼容平台原有监控系统,收集客户端上报的的监控数据经过parser转换成prometheus格式的监控数据。然后等待prometheus pull,然后以pull的时间保存时序数据,这其中的时延依网络环境变得非常不可控,这对于时间敏感性高的监控指标来说是硬伤。
于是在构思一种让Prometheus接收数据的方式,timestamp使用指标采集时间,不经过prometheus服务发现模块,数据预处理后直接写入TSDB。接收到的数据首先经过预处理(监控项、标签符合prometheus规范,时间必须在有效范围...); 对于push上来的监控项,由于其游离于discovery之外,如果想对某些监控项进行聚合计算、告警规则等配置的增删改就需要额外的策略来实现动态可配置。