OpenFalcon源码分析(graph组件)
2018-10-15 本文已影响0人
Xiao_Yang
基础信息
- graph版本
VERSION = "0.5.9"
- graph组件功能
Graph将采集与传送组件每次push上来的数据,进行采样存储,并提供查询接口。
graph组件逻辑图
- graph组件逻辑图
源码分析目录
- CheckSum计算
- RRD key名生成与解析 、 RRD文件名计算
- UUID 和 MD5(UUID)计算
- Graph内存缓存机制(GraphItemMap、HistoryCache、IndexItemCache)分析
- api.Start() RPC服务监听与处理
- RPC -> graph.Send 接收采集数据
- RPC -> graph.Query 查询rrd数据
- RPC -> graph.Delete 删除rrd
- RPC -> graph.GetRrd 获取rrd数据
- RPC -> graph.Info RRD基础信息
- RPC -> graph.Last最后一次信息
- RPC -> graph.LastRaw最后一次原始数据
注:简书内锚链接失效
入口和基础功能分析
- main主入口分析 ↗
func main() {
cfg := flag.String("c", "cfg.json", "specify config file")
version := flag.Bool("v", false, "show version")
versionGit := flag.Bool("vg", false, "show version and git commit log")
flag.Parse() //命令参数解析
if *version { //-v 版本查看
fmt.Println(g.VERSION)
os.Exit(0)
}
if *versionGit { //-vg 版本和git commit log查看
fmt.Println(g.VERSION, g.COMMIT)
os.Exit(0)
}
// 全局配置文件解析
g.ParseConfig(*cfg)
if g.Config().Debug { //全局日志debug级别设置
g.InitLog("debug")
} else {
g.InitLog("info")
gin.SetMode(gin.ReleaseMode)
}
//初始化DB client conn
g.InitDB() 【参看详细分析】
//初始化rrdtool Channel
rrdtool.InitChannel() 【参看详细分析】
//rrdtool启动
rrdtool.Start() 【参看详细分析】
//RPC API服务启动监听与处理
go api.Start() 【参看详细分析】
//索引服务启动
index.Start() 【参看详细分析】
//HTTP API服务启动监听与处理
go http.Start() 【参看详细分析】
//清理缓存
go cron.CleanCache() 【参看详细分析】
start_signal(os.Getpid(), g.Config()) //优雅退出机制
}
// 系统信息注册、处理与资源回收(实现优雅的关闭系统)
func start_signal(pid int, cfg *g.GlobalConfig) {
sigs := make(chan os.Signal, 1) //创建传送信号channal
log.Println(pid, "register signal notify")
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) //注册系统信号通知
for {
s := <-sigs //接收系统信息号
log.Println("recv", s)
switch s {
//处理信号类型
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
log.Println("graceful shut down")
if cfg.Http.Enabled { //关闭Http
http.Close_chan <- 1
<-http.Close_done_chan
}
log.Println("http stop ok")
if cfg.Rpc.Enabled { //关闭RPC
api.Close_chan <- 1
<-api.Close_done_chan
}
log.Println("rpc stop ok")
//rrd退出存盘
rrdtool.Out_done_chan <- 1
rrdtool.FlushAll(true)
log.Println("rrdtool stop ok")
log.Println(pid, "exit")
os.Exit(0)
}
}
}
- ParseConfig 全局配置解析 ↗
func ParseConfig(cfg string) {
if cfg == "" {
log.Fatalln("config file not specified: use -c $filename")
}
//文件是否存在
if !file.IsExist(cfg) {
log.Fatalln("config file specified not found:", cfg)
}
ConfigFile = cfg
//json字符串化
configContent, err := file.ToTrimString(cfg)
if err != nil {
log.Fatalln("read config file", cfg, "error:", err.Error())
}
var c GlobalConfig
//json反解析为结构
err = json.Unmarshal([]byte(configContent), &c)
if err != nil {
log.Fatalln("parse config file", cfg, "error:", err.Error())
}
//Migrate迁移开关
if c.Migrate.Enabled && len(c.Migrate.Cluster) == 0 {
c.Migrate.Enabled = false
}
//确保ioWorkerNum是2^N
if c.IOWorkerNum == 0 || (c.IOWorkerNum&(c.IOWorkerNum-1) != 0) {
log.Fatalf("IOWorkerNum must be 2^N, current IOWorkerNum is %v", c.IOWorkerNum)
}
//需要md5的前多少位参与ioWorker的分片计算
c.FirstBytesSize = len(strconv.FormatInt(int64(c.IOWorkerNum), 16))
//配置赋值给内部变量,Config()公开方法返回所有配置
atomic.StorePointer(&ptr, unsafe.Pointer(&c))
log.Println("g.ParseConfig ok, file", cfg)
}
//全局配置文件结构体
type GlobalConfig struct {
Pid string `json:"pid"`
Debug bool `json:"debug"`
Http *HttpConfig `json:"http"`
Rpc *RpcConfig `json:"rpc"`
RRD *RRDConfig `json:"rrd"`
DB *DBConfig `json:"db"`
CallTimeout int32 `json:"callTimeout"`
IOWorkerNum int `json:"ioWorkerNum"`
FirstBytesSize int
Migrate struct {
Concurrency int `json:"concurrency"` //number of multiple worker per node
Enabled bool `json:"enabled"`
Replicas int `json:"replicas"`
Cluster map[string]string `json:"cluster"`
} `json:"migrate"`
}
- g.InitDB() 初始化mysql连接 ↗
func InitDB() {
var err error
DB, err = makeDbConn() //创建新连接
if DB == nil || err != nil {
log.Fatalln("g.InitDB, get db conn fail", err)
}
dbConnMap = make(map[string]*sql.DB) //?未找到SetDbConn,仅GetDbConn
log.Println("g.InitDB ok")
}
// 创建一个新的mysql连接
func makeDbConn() (conn *sql.DB, err error) {
conn, err = sql.Open("mysql", Config().DB.Dsn) //sql conn
if err != nil {
return nil, err
}
conn.SetMaxIdleConns(Config().DB.MaxIdle) //连接池允许的最大连接数
err = conn.Ping()
return conn, err
}
RRD服务初始化
- rrdtool.InitChannel() 初始化IO Worker Channel ↗
func InitChannel() {
Out_done_chan = make(chan int, 1) //退出信号Channel
ioWorkerNum := g.Config().IOWorkerNum //IO worker数
//创建与初始化指定IOWorker数量的Channel
io_task_chans = make([]chan *io_task_t, ioWorkerNum)
for i := 0; i < ioWorkerNum; i++ {
io_task_chans[i] = make(chan *io_task_t, 16)
}
}
- rrdtool.Start() rrdTool服务启动 ↗
func Start() {
cfg := g.Config()
var err error
// 检测data_dir,确保可读写权限
if err = file.EnsureDirRW(cfg.RRD.Storage); err != nil {
log.Fatalln("rrdtool.Start error, bad data dir "+cfg.RRD.Storage+",", err)
}
migrate_start(cfg) //迁移数据线程,监听与处理NET_TASK_M_XXX任务
//主要功能Send/query/pull RRD数据
go syncDisk() //同步缓存(GraphItemsMap)至磁盘RRD文件
go ioWorker() //IO工作线程,监听与处理IO_TASK_M_XXX任务
//主要功能Read/write/flush RRD文件
log.Println("rrdtool.Start ok")
}
关键元素分析(checkSum、key、rrd filename、uuid)
- CheckSum计算↗
checksum := items[i].Checksum() //每个item Checksum计算
func (t *GraphItem) Checksum() string {
return MUtils.Checksum(t.Endpoint, t.Metric, t.Tags)
}
//Checksum计算函数实现(字符串化->MD5)
// MD5(主键)
func Checksum(endpoint string, metric string, tags map[string]string) string {
pk := PK(endpoint, metric, tags) //字符串化 (PrimaryKey) return Md5(pk) //md5 hash
}
# 主键(PrimaryKey)
# Item Checksum 字符串化规则
# 无Tags:"endpoint/metric"
# 有Tags:"endpoint/metric/k=v,k=v..."
func PK(endpoint, metric string, tags map[string]string) string {
ret := bufferPool.Get().(*bytes.Buffer)
ret.Reset()
defer bufferPool.Put(ret)
if tags == nil || len(tags) == 0 { //无tags
ret.WriteString(endpoint)
ret.WriteString("/")
ret.WriteString(metric)
return ret.String()
}
ret.WriteString(endpoint)
ret.WriteString("/")
ret.WriteString(metric)
ret.WriteString("/")
ret.WriteString(SortedTags(tags))
return ret.String()
}
## 字符串化Tags项 (补图)
func SortedTags(tags map[string]string) string {
if tags == nil {
return ""
}
size := len(tags)
if size == 0 {
return ""
}
ret := bufferPool.Get().(*bytes.Buffer)
ret.Reset()
defer bufferPool.Put(ret)
if size == 1 { //tags长度为1个时字串格式
for k, v := range tags {
ret.WriteString(k)
ret.WriteString("=")
ret.WriteString(v)
}
return ret.String()
}
keys := make([]string, size) //缓存tags Key slice
i := 0
for k := range tags {
keys[i] = k
i++
}
sort.Strings(keys)
for j, key := range keys { //tags长度>1个时字串格式
ret.WriteString(key)
ret.WriteString("=")
ret.WriteString(tags[key])
if j != size-1 {
ret.WriteString(",") //"k=v,k=v..."
}
}
return ret.String()
}
# MD5 hash
func Md5(raw string) string {
h := md5.Sum([]byte(raw))
return hex.EncodeToString(h[:])
}
元素计算.jpg
- RRD key名生成与解析 、 RRD文件名计算 ↗
key := g.FormRrdCacheKey(checksum, dsType, step) //生成Key名称,用于采集Item数据缓存内存唯一标识和RRD文件名生成
// 生成rrd缓存数据的key Name
// md5,dsType,step -> "md5_dsType_step"
func FormRrdCacheKey(md5 string, dsType string, step int) string {
return md5 + "_" + dsType + "_" + strconv.Itoa(step)
}
// 反解析rrd Key名
// "md5_dsType_step" -> md5,dsType,step
func SplitRrdCacheKey(ckey string) (md5 string, dsType string, step int, err error) {
ckey_slice := strings.Split(ckey, "_") //分割字符串
if len(ckey_slice) != 3 {
err = fmt.Errorf("bad rrd cache key: %s", ckey)
return
}
md5 = ckey_slice[0] //第一段:md5
dsType = ckey_slice[1] //第二段:dsType
stepInt64, err := strconv.ParseInt(ckey_slice[2], 10, 32)
if err != nil {
return
}
step = int(stepInt64) //第三段:step
err = nil
return
}
// RRDTOOL UTILS
// 监控数据对应的rrd文件名称
// "baseDir/md5[0:2]/md5_dsType_step.rrd"
func RrdFileName(baseDir string, md5 string, dsType string, step int) string {
return baseDir + "/" + md5[0:2] + "/" +
md5 + "_" + dsType + "_" + strconv.Itoa(step) + ".rrd"
}
- UUID 和 MD5(UUID)计算↗
uuid := item.UUID() //UUID用于索引缓存的元素数据唯一标识
func (this *GraphItem) UUID() string {
return MUtils.UUID(this.Endpoint, this.Metric, this.Tags, this.DsType, this.Step)
}
func UUID(endpoint, metric string, tags map[string]string, dstype string, step int) string {
ret := bufferPool.Get().(*bytes.Buffer)
ret.Reset()
defer bufferPool.Put(ret)
//无tags UUID格式
//"endpoint/metric/dstype/step"
if tags == nil || len(tags) == 0 {
ret.WriteString(endpoint)
ret.WriteString("/")
ret.WriteString(metric)
ret.WriteString("/")
ret.WriteString(dstype)
ret.WriteString("/")
ret.WriteString(strconv.Itoa(step))
return ret.String()
}
//有tags UUID格式
// "endpoint/metric/k=v,k=v.../dstype/step"
ret.WriteString(endpoint)
ret.WriteString("/")
ret.WriteString(metric)
ret.WriteString("/")
ret.WriteString(SortedTags(tags))
ret.WriteString("/")
ret.WriteString(dstype)
ret.WriteString("/")
ret.WriteString(strconv.Itoa(step))
return ret.String()
}
# MD5(UUID)
func ChecksumOfUUID(endpoint, metric string, tags map[string]string, dstype string, step int64) string {
return Md5(UUID(endpoint, metric, tags, dstype, int(step)))
}
- Graph内存缓存机制(GraphItemMap、HistoryCache、IndexItemCache)分析 ↗
###############
#GraphItemMap
###############
#store->store.go
type GraphItem struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
DsType string `json:"dstype"`
Step int `json:"step"`
Heartbeat int `json:"heartbeat"`
Min string `json:"min"`
Max string `json:"max"`
}
type GraphItemMap struct {
sync.RWMutex
A []map[string]*SafeLinkedList
Size int
}
func init() {
size := g.CACHE_TIME / g.FLUSH_DISK_STEP
if size < 0 {
log.Panicf("store.init, bad size %d\n", size)
}
GraphItems = &GraphItemMap{
A: make([]map[string]*SafeLinkedList, size),
Size: size,
}
for i := 0; i < size; i++ {
GraphItems.A[i] = make(map[string]*SafeLinkedList)
}
}
func (this *GraphItemMap) Get(key string) (*SafeLinkedList, bool)
func (this *GraphItemMap) Remove(key string) bool
func (this *GraphItemMap) Getitems(idx int) map[string]*SafeLinkedList
func (this *GraphItemMap) Set(key string, val *SafeLinkedList)
func (this *GraphItemMap) Len() int
func (this *GraphItemMap) First(key string) *cmodel.GraphItem
func (this *GraphItemMap) PushAll(key string, items []*cmodel.GraphItem) error
func (this *GraphItemMap) GetFlag(key string) (uint32, error)
func (this *GraphItemMap) SetFlag(key string, flag uint32) error
func (this *GraphItemMap) PopAll(key string) []*cmodel.GraphItem
func (this *GraphItemMap) FetchAll(key string) ([]*cmodel.GraphItem, uint32)
func (this *GraphItemMap) PushFront(key string,
item *cmodel.GraphItem, md5 string, cfg *g.GlobalConfig)
func (this *GraphItemMap) KeysByIndex(idx int) []string
func (this *GraphItemMap) Back(key string) *cmodel.GraphItem
func (this *GraphItemMap) ItemCnt(key string) int
func hashKey(key string) uint32 {
if len(key) < 64 {
var scratch [64]byte
copy(scratch[:], key)
return crc32.ChecksumIEEE(scratch[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}
func getWts(key string, now int64) int64 {
interval := int64(g.CACHE_TIME)
return now + interval - (int64(hashKey(key)) % interval)
}
###############
#SafeLinkedList
###############
# store-> linkeddlist.go 使用的官方"container/list"双向链表
type SafeLinkedList struct {
sync.RWMutex
Flag uint32
L *list.List
}
// 新创建SafeLinkedList容器
func NewSafeLinkedList() *SafeLinkedList {
return &SafeLinkedList{L: list.New()}
}
func (this *SafeLinkedList) PushFront(v interface{}) *list.Element
func (this *SafeLinkedList) Front() *list.Element
func (this *SafeLinkedList) PopBack() *list.Element
func (this *SafeLinkedList) Back() *list.Element
func (this *SafeLinkedList) Len() int
func (this *SafeLinkedList) PopAll() []*cmodel.GraphItem
func (this *SafeLinkedList) PushAll(items []*cmodel.GraphItem)
func (this *SafeLinkedList) FetchAll() ([]*cmodel.GraphItem, uint32)
###############
# HistoryCache
###############
const (
defaultHistorySize = 3
)
var (
// mem: front = = = back
// time: latest ... old
HistoryCache = tmap.NewSafeMap()
)
func GetLastItem(key string) *cmodel.GraphItem {
itemlist, found := HistoryCache.Get(key)
if !found || itemlist == nil {
return &cmodel.GraphItem{}
}
first := itemlist.(*tlist.SafeListLimited).Front()
if first == nil {
return &cmodel.GraphItem{}
}
return first.(*cmodel.GraphItem)
}
func GetAllItems(key string) []*cmodel.GraphItem {
ret := make([]*cmodel.GraphItem, 0)
itemlist, found := HistoryCache.Get(key)
if !found || itemlist == nil {
return ret
}
all := itemlist.(*tlist.SafeListLimited).FrontAll()
for _, item := range all {
if item == nil {
continue
}
ret = append(ret, item.(*cmodel.GraphItem))
}
return ret
}
func AddItem(key string, val *cmodel.GraphItem) {
itemlist, found := HistoryCache.Get(key)
var slist *tlist.SafeListLimited
if !found {
slist = tlist.NewSafeListLimited(defaultHistorySize)
HistoryCache.Put(key, slist)
} else {
slist = itemlist.(*tlist.SafeListLimited)
}
// old item should be drop
first := slist.Front()
if first == nil || first.(*cmodel.GraphItem).Timestamp < val.Timestamp { // first item or latest one
slist.PushFrontViolently(val)
}
}
rrdtool服务分析
- migrate_start运行迁移数据↗
## 迁移数据处理启动
func migrate_start(cfg *g.GlobalConfig) {
var err error
var i int
if cfg.Migrate.Enabled { //全局配置文件开启迁移
Consistent.NumberOfReplicas = cfg.Migrate.Replicas //设置一致性hash算法节点副本数量
nodes := cutils.KeysOfMap(cfg.Migrate.Cluster) //graph服务器节点列表
for _, node := range nodes {
addr := cfg.Migrate.Cluster[node]
Consistent.Add(node) //一致性hash环内增加存储节点
Net_task_ch[node] = make(chan *Net_task_t, 16) //为每个node分配任务channel
clients[node] = make([]*rpc.Client, cfg.Migrate.Concurrency) //为每个node分配的并发RPC clients
for i = 0; i < cfg.Migrate.Concurrency; i++ {
//创建并发RPC Client conn
if clients[node][i], err = dial(addr, time.Second); err != nil {
log.Fatalf("node:%s addr:%s err:%s\n", node, addr, err)
}
go net_task_worker(i, Net_task_ch[node], &clients[node][i], addr) //为每一个node创建并发量(默认值为2)的worker线程执行任务
}
}
}
}
### 任务工作线程,主要处理类型为SEND/QUERY/PULL任务
### NET_TASK_M_SEND:向其它Graph模块发送某个md5(指标)的数据
### NET_TASK_M_QUERY:向其它Graph模块发起查询请求
### NET_TASK_M_PULL:从其它Graph模块拉取数据,然后保存至本地
func net_task_worker(idx int, ch chan *Net_task_t, client **rpc.Client, addr string) {
var err error
for {
select {
case task := <-ch:
//任务类型:NET_TASK_M_SEND
if task.Method == NET_TASK_M_SEND {
if err = send_data(client, task.Key, addr); err != nil {
pfc.Meter("migrate.send.err", 1)
atomic.AddUint64(&stat_cnt[SEND_S_ERR], 1)
} else {
pfc.Meter("migrate.send.ok", 1)
atomic.AddUint64(&stat_cnt[SEND_S_SUCCESS], 1)
}
//任务类型:NET_TASK_M_QUERY
} else if task.Method == NET_TASK_M_QUERY {
if err = query_data(client, addr, task.Args, task.Reply); err != nil {
pfc.Meter("migrate.query.err", 1)
atomic.AddUint64(&stat_cnt[QUERY_S_ERR], 1)
} else {
pfc.Meter("migrate.query.ok", 1)
atomic.AddUint64(&stat_cnt[QUERY_S_SUCCESS], 1)
}
//任务类型:NET_TASK_M_PULL
} else if task.Method == NET_TASK_M_PULL {
if atomic.LoadInt32(&flushrrd_timeout) != 0 {
// hope this more faster than fetch_rrd
if err = send_data(client, task.Key, addr); err != nil {
pfc.Meter("migrate.sendbusy.err", 1)
atomic.AddUint64(&stat_cnt[SEND_S_ERR], 1)
} else {
pfc.Meter("migrate.sendbusy.ok", 1)
atomic.AddUint64(&stat_cnt[SEND_S_SUCCESS], 1)
}
} else {
if err = fetch_rrd(client, task.Key, addr); err != nil {
if os.IsNotExist(err) {
pfc.Meter("migrate.scprrd.null", 1)
//文件不存在时,直接将缓存数据刷入本地
atomic.AddUint64(&stat_cnt[FETCH_S_ISNOTEXIST], 1)
store.GraphItems.SetFlag(task.Key, 0)
CommitByKey(task.Key)
} else {
pfc.Meter("migrate.scprrd.err", 1)
//warning:其他异常情况,缓存数据会堆积
atomic.AddUint64(&stat_cnt[FETCH_S_ERR], 1)
}
} else {
pfc.Meter("migrate.scprrd.ok", 1)
atomic.AddUint64(&stat_cnt[FETCH_S_SUCCESS], 1)
}
}
} else {
err = errors.New("error net task method")
}
if task.Done != nil {
task.Done <- err
}
}
}
}
##### 重连机制实现
func reconnection(client **rpc.Client, addr string) {
pfc.Meter("migrate.reconnection."+addr, 1)
var err error
atomic.AddUint64(&stat_cnt[CONN_S_ERR], 1)
if *client != nil {
(*client).Close()
}
*client, err = dial(addr, time.Second) //dial
atomic.AddUint64(&stat_cnt[CONN_S_DIAL], 1)
for err != nil {
//danger!! block routine
time.Sleep(time.Millisecond * 500)
*client, err = dial(addr, time.Second) //dial again
atomic.AddUint64(&stat_cnt[CONN_S_DIAL], 1)
}
}
- Graph迁移数据任务类型处理分析↗
# 任务类型NET_TASK_M_SEND 处理分析
# 带失败尝试机制的RPC方式发送指定Key的所有Items数据
func send_data(client **rpc.Client, key string, addr string) error {
var (
err error
flag uint32
resp *cmodel.SimpleRpcResponse
i int
)
//remote
if flag, err = store.GraphItems.GetFlag(key); err != nil {
return err
}
cfg := g.Config()
store.GraphItems.SetFlag(key, flag|g.GRAPH_F_SENDING)//设置Item的SENDING(发送中)志位
items := store.GraphItems.PopAll(key) //POP指定Key的数据项
items_size := len(items)
if items_size == 0 {
goto out
}
resp = &cmodel.SimpleRpcResponse{}
for i = 0; i < 3; i++ { //尝试三次重连机制,发送RPC数据
err = rpc_call(*client, "Graph.Send", items, resp,
time.Duration(cfg.CallTimeout)*time.Millisecond) //发送数据
if err == nil {
goto out
}
if err == rpc.ErrShutdown {
reconnection(client, addr) //RPC失败重连
}
}
// err
store.GraphItems.PushAll(key, items) //错误发生则放回数据
//flag |= g.GRAPH_F_ERR
out:
flag &= ^g.GRAPH_F_SENDING //SENDING标志位,置0
store.GraphItems.SetFlag(key, flag)
return err
}
# 任务类型NET_TASK_M_QUERY 处理分析
# 带失败尝试机制的RPC请求查询
func query_data(client **rpc.Client, addr string,
args interface{}, resp interface{}) error {
var (
err error
i int
)
for i = 0; i < 3; i++ { //失败尝试机制
err = rpc_call(*client, "Graph.Query", args, resp,
time.Duration(g.Config().CallTimeout)*time.Millisecond) //RPC请求查询
if err == nil {
break
}
if err == rpc.ErrShutdown {
reconnection(client, addr) //重连
}
}
return err
}
# 任务类型NET_TASK_M_PULL 处理分析
# 带重试机制的RPC拉取RRD请求与发送IO写任务请求
func fetch_rrd(client **rpc.Client, key string, addr string) error {
var (
err error
flag uint32
md5 string
dsType string
filename string
step, i int
rrdfile g.File
)
cfg := g.Config()
if flag, err = store.GraphItems.GetFlag(key); err != nil {
return err
}
store.GraphItems.SetFlag(key, flag|g.GRAPH_F_FETCHING) //设置FETCHING(拉取中)标志位
md5, dsType, step, _ = g.SplitRrdCacheKey(key) //解析Key
filename = g.RrdFileName(cfg.RRD.Storage, md5, dsType, step)
// 文件名:
// cfg.RRD.Storage保存配置目录
// md5值
// dsType类型
// step周期
for i = 0; i < 3; i++ {
err = rpc_call(*client, "Graph.GetRrd", key, &rrdfile,
time.Duration(cfg.CallTimeout)*time.Millisecond) //RPC获取RRD
if err == nil {
done := make(chan error, 1)
io_task_chans[getIndex(md5)] <- &io_task_t{
method: IO_TASK_M_WRITE,
args: &g.File{
Filename: filename,
Body: rrdfile.Body[:],
},
done: done,
} //构造IO write任务,发送到指定NODE的任务队列中
if err = <-done; err != nil {
goto out
} else {
flag &= ^g.GRAPH_F_MISS
goto out
}
} else {
log.Println(err)
}
if err == rpc.ErrShutdown {
reconnection(client, addr)
}
}
out:
flag &= ^g.GRAPH_F_FETCHING
store.GraphItems.SetFlag(key, flag)
return err
}
- syncDisk() 同步缓存(GraphItemsMap)至磁盘RRD文件↗
func syncDisk() {
time.Sleep(time.Second * g.CACHE_DELAY)
ticker := time.NewTicker(time.Millisecond * g.FLUSH_DISK_STEP)
defer ticker.Stop()
var idx int = 0
for {
select {
case <-ticker.C:
idx = idx % store.GraphItems.Size
FlushRRD(idx, false) //flush缓存数据至RRD
idx += 1
case <-Out_done_chan:
log.Println("cron recv sigout and exit...")
return
}
}
}
##flush缓存数据至RRD
func FlushRRD(idx int, force bool) {
begin := time.Now()
atomic.StoreInt32(&flushrrd_timeout, 0)
keys := store.GraphItems.KeysByIndex(idx)
if len(keys) == 0 {
return
}
for _, key := range keys {
flag, _ := store.GraphItems.GetFlag(key)
//write err data to local filename
if force == false && g.Config().Migrate.Enabled && flag&g.GRAPH_F_MISS != 0 {
if time.Since(begin) > time.Millisecond*g.FLUSH_DISK_STEP {
atomic.StoreInt32(&flushrrd_timeout, 1)
}
PullByKey(key) //从其它NODE拉取
} else if force || shouldFlush(key) { //shouldFlush判断条件是不需要满足写入文件条件
CommitByKey(key) //将缓存写入文件
}
}
}
### 构建PULL类型网络任务,从其它NODE节点拉取数据
func PullByKey(key string) {
done := make(chan error)
item := store.GraphItems.First(key)
if item == nil {
return
}
node, err := Consistent.Get(item.PrimaryKey()) //获取节点
if err != nil {
return
}
Net_task_ch[node] <- &Net_task_t{ //构建PULL网络任务
Method: NET_TASK_M_PULL,
Key: key,
Done: done,
}
// net_task slow, shouldn't block syncDisk() or FlushAll()
// warning: recev sigout when migrating, maybe lost memory data
go func() {
err := <-done //返回结果ERR判断
if err != nil {
log.Printf("get %s from remote err[%s]\n", key, err)
return
}
atomic.AddUint64(&net_counter, 1)
//todo: flushfile after getfile? not yet
}()
}
### 将内存缓存数据写RRD文件
func CommitByKey(key string) {
md5, dsType, step, err := g.SplitRrdCacheKey(key) //解析Key
if err != nil {
return
}
filename := g.RrdFileName(g.Config().RRD.Storage, md5, dsType, step) //构造文件路径
items := store.GraphItems.PopAll(key) //POP数据
if len(items) == 0 {
return
}
FlushFile(filename, md5, items) //写RRD文件
}
#### 写RRD文件(构造IO Flush任务)
func FlushFile(filename, md5 string, items []*cmodel.GraphItem) error {
done := make(chan error, 1)
io_task_chans[getIndex(md5)] <- &io_task_t{ //IO任务FLUSH
method: IO_TASK_M_FLUSH,
args: &flushfile_t{
filename: filename,
items: items,
},
done: done,
}
atomic.AddUint64(&disk_counter, 1)
return <-done //返回ERROR结果
}
注: 此时可以看到仅以IO任务的方式提交至IO任务Channel,其真正功能实现部分将在接下来展开分析。
- ioWorker IO工作线程监听与处理IO_TASK_M_XXX任务↗
func ioWorker() {
ioWorkerNum := g.Config().IOWorkerNum
for i := 0; i < ioWorkerNum; i++ {
go func(i int) {
var err error
for {
select {
case task := <-io_task_chans[i]:
//IO任务类型IO_TASK_M_READ
if task.method == IO_TASK_M_READ {
if args, ok := task.args.(*readfile_t); ok {
args.data, err = ioutil.ReadFile(args.filename) //读文件内容
task.done <- err
}
//IO任务类型IO_TASK_M_WRITE
} else if task.method == IO_TASK_M_WRITE {
//filename must not exist
if args, ok := task.args.(*g.File); ok {
baseDir := file.Dir(args.Filename)
if err = file.InsureDir(baseDir); err != nil {
task.done <- err
}
task.done <- writeFile(args.Filename, args.Body, 0644)//写入文件
}
//IO任务类型IO_TASK_M_FLUSH
} else if task.method == IO_TASK_M_FLUSH {
if args, ok := task.args.(*flushfile_t); ok {
task.done <- flushrrd(args.filename, args.items) //flushrrd将缓存内容写入RRD文件
}
//IO任务类型IO_TASK_M_FETCH
} else if task.method == IO_TASK_M_FETCH {
if args, ok := task.args.(*fetch_t); ok {
args.data, err = fetch(args.filename, args.cf, args.start, args.end, args.step) //fetch获取数据
task.done <- err
}
}
}
}
}(i)
}
}
## 写文件内容
func writeFile(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
if err1 := f.Close(); err == nil {
err = err1
}
return err
}
## 获取RRD数据
func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RRDData, error) {
start_t := time.Unix(start, 0)
end_t := time.Unix(end, 0)
step_t := time.Duration(step) * time.Second
fetchRes, err := rrdlite.Fetch(filename, cf, start_t, end_t, step_t) //rrdlite.Fetch从RRD文件获取指定时间数据
if err != nil {
return []*cmodel.RRDData{}, err
}
defer fetchRes.FreeValues()
values := fetchRes.Values()
size := len(values)
ret := make([]*cmodel.RRDData, size)
start_ts := fetchRes.Start.Unix()
step_s := fetchRes.Step.Seconds()
//整理返回数据格式
for i, val := range values {
ts := start_ts + int64(i+1)*int64(step_s)
d := &cmodel.RRDData{
Timestamp: ts,
Value: cmodel.JsonFloat(val),
}
ret[i] = d
}
return ret, nil
}
RRD 存储(创建rrd与更新数据)
任务类型IO_TASK_M_FLUSH将执行对缓存数据刷入RRD文件进行持久化存储(
ioworker线程监听FLUSH任务并调用flushrrd
)
func flushrrd(filename string, items []*cmodel.GraphItem) error {
if items == nil || len(items) == 0 {
return errors.New("empty items")
}
if !g.IsRrdFileExist(filename) { //如果RRD文件不存在
baseDir := file.Dir(filename)
err := file.InsureDir(baseDir)
if err != nil {
return err
}
err = create(filename, items[0]) //创建RRD
if err != nil {
return err
}
}
return update(filename, items) //如果存在RRD文件则更新RRD数据
}
}
- create() 创建RRD文件 ↗
// RRA.Point.Size
const (
RRA1PointCnt = 720 // 1m一个点存12h
RRA5PointCnt = 576 // 5m一个点存2d
RRA20PointCnt = 504 // 20m一个点存7d
RRA180PointCnt = 766 // 3h一个点存3month
RRA720PointCnt = 730 // 12h一个点存1year
)
func create(filename string, item *cmodel.GraphItem) error {
now := time.Now()
start := now.Add(time.Duration(-24) * time.Hour)
step := uint(item.Step)
c := rrdlite.NewCreator(filename, start, step) //
c.DS("metric", item.DsType, item.Heartbeat, item.Min, item.Max)
// 设置各种归档策略
// 1分钟一个点存 12小时
c.RRA("AVERAGE", 0, 1, RRA1PointCnt)
// 5m一个点存2d
c.RRA("AVERAGE", 0, 5, RRA5PointCnt)
c.RRA("MAX", 0, 5, RRA5PointCnt)
c.RRA("MIN", 0, 5, RRA5PointCnt)
// 20m一个点存7d
c.RRA("AVERAGE", 0, 20, RRA20PointCnt)
c.RRA("MAX", 0, 20, RRA20PointCnt)
c.RRA("MIN", 0, 20, RRA20PointCnt)
// 3小时一个点存3个月
c.RRA("AVERAGE", 0, 180, RRA180PointCnt)
c.RRA("MAX", 0, 180, RRA180PointCnt)
c.RRA("MIN", 0, 180, RRA180PointCnt)
// 12小时一个点存1year
c.RRA("AVERAGE", 0, 720, RRA720PointCnt)
c.RRA("MAX", 0, 720, RRA720PointCnt)
c.RRA("MIN", 0, 720, RRA720PointCnt)
return c.Create(true)
}
- update() 更新RRD文件数据 ↗
func update(filename string, items []*cmodel.GraphItem) error {
u := rrdlite.NewUpdater(filename)
for _, item := range items {
v := math.Abs(item.Value)
if v > 1e+300 || (v < 1e-300 && v > 0) {
continue
}
if item.DsType == "DERIVE" || item.DsType == "COUNTER" {
u.Cache(item.Timestamp, int(item.Value))
} else {
u.Cache(item.Timestamp, item.Value)
}
}
return u.Update()
}
Graph索引管理分析
index.Start()
// 初始化索引功能模块
func Start() {
InitCache()
go StartIndexUpdateIncrTask()
log.Debug("index.Start ok")
}
// 初始化cache
func InitCache() {
go startCacheProcUpdateTask()
}
// 更新 cache的统计信息
func startCacheProcUpdateTask() {
for {
time.Sleep(DefaultCacheProcUpdateTaskSleepInterval)
proc.IndexedItemCacheCnt.SetCnt(int64(IndexedItemCache.Size()))
proc.UnIndexedItemCacheCnt.SetCnt(int64(unIndexedItemCache.Size()))
proc.EndpointCacheCnt.SetCnt(int64(dbEndpointCache.Size()))
proc.CounterCacheCnt.SetCnt(int64(dbEndpointCounterCache.Size()))
}
}
// 启动索引的 异步、增量更新 任务, 每隔一定时间,刷新cache中的数据到数据库中
func StartIndexUpdateIncrTask() {
for {
time.Sleep(IndexUpdateIncrTaskSleepInterval)
startTs := time.Now().Unix()
cnt := updateIndexIncr()
endTs := time.Now().Unix()
// statistics
proc.IndexUpdateIncrCnt.SetCnt(int64(cnt))
proc.IndexUpdateIncr.Incr()
proc.IndexUpdateIncr.PutOther("lastStartTs", ntime.FormatTs(startTs))
proc.IndexUpdateIncr.PutOther("lastTimeConsumingInSec", endTs-startTs)
}
}
// 进行一次增量更新
func updateIndexIncr() int {
ret := 0
if unIndexedItemCache == nil || unIndexedItemCache.Size() <= 0 {
return ret
}
dbConn, err := g.GetDbConn("UpdateIndexIncrTask")
if err != nil {
log.Error("[ERROR] get dbConn fail", err)
return ret
}
keys := unIndexedItemCache.Keys()
for _, key := range keys {
icitem := unIndexedItemCache.Get(key)
unIndexedItemCache.Remove(key)
if icitem != nil {
// 并发更新mysql
semaUpdateIndexIncr.Acquire()
go func(key string, icitem *IndexCacheItem, dbConn *sql.DB) {
defer semaUpdateIndexIncr.Release()
err := updateIndexFromOneItem(icitem.Item, dbConn)
if err != nil {
proc.IndexUpdateIncrErrorCnt.Incr()
} else {
IndexedItemCache.Put(key, icitem)
}
}(key, icitem.(*IndexCacheItem), dbConn)
ret++
}
}
return ret
}
RPC 服务分析
- api.Start() RPC服务监听与处理 ↗
func Start() {
if !g.Config().Rpc.Enabled { //配置开启与否
log.Println("rpc.Start warning, not enabled")
return
}
addr := g.Config().Rpc.Listen //监听地址
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
log.Fatalf("rpc.Start error, net.ResolveTCPAddr failed, %s", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Fatalf("rpc.Start error, listen %s failed, %s", addr, err)
} else {
log.Println("rpc.Start ok, listening on", addr)
}
rpc.Register(new(Graph)) //注册Graph类方法
go func() {
var tempDelay time.Duration // how long to sleep on accept failure
for {
conn, err := listener.Accept() //tcp conn
if err != nil {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
tempDelay = 0
go func() {
e := connects.insert(conn) //conn池管理
defer connects.remove(e)
rpc.ServeConn(conn) //conn处理
}()
}
}()
select {
case <-Close_chan: //关闭信号
log.Println("rpc, recv sigout and exiting...")
listener.Close()
Close_done_chan <- 1
connects.Lock()
for e := connects.list.Front(); e != nil; e = e.Next() {
e.Value.(net.Conn).Close() //关闭Conn
}
connects.Unlock()
return
}
}
- RPC -> graph.Send 接收采集数据 ↗
func (this *Graph) Send(items []*cmodel.GraphItem, resp *cmodel.SimpleRpcResponse) error {
go handleItems(items) //接收处理
return nil
}
func handleItems(items []*cmodel.GraphItem) {
if items == nil {
return
}
count := len(items) //数据长度
if count == 0 {
return
}
cfg := g.Config()
//迭代数据
for i := 0; i < count; i++ {
if items[i] == nil {
continue
}
endpoint := items[i].Endpoint
if !g.IsValidString(endpoint) {
if cfg.Debug {
log.Printf("invalid endpoint: %s", endpoint)
}
pfc.Meter("invalidEnpoint", 1)
continue
}
counter := cutils.Counter(items[i].Metric, items[i].Tags)
if !g.IsValidString(counter) {
if cfg.Debug {
log.Printf("invalid counter: %s/%s", endpoint, counter)
}
pfc.Meter("invalidCounter", 1)
continue
}
dsType := items[i].DsType
step := items[i].Step
checksum := items[i].Checksum()
key := g.FormRrdCacheKey(checksum, dsType, step)
//统计
proc.GraphRpcRecvCnt.Incr()
// 缓存至内存
first := store.GraphItems.First(key)
if first != nil && items[i].Timestamp <= first.Timestamp {
continue
}
store.GraphItems.PushFront(key, items[i], checksum, cfg)
// 保存数据索引
index.ReceiveItem(items[i], checksum)
// 保存至历史缓存
store.AddItem(checksum, items[i])
}
}
- RPC -> graph.Query 查询rrd数据 ↗
func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryResponse) error {
var (
datas []*cmodel.RRDData
datas_size int
)
//统计
proc.GraphQueryCnt.Incr()
cfg := g.Config()
// form empty response
resp.Values = []*cmodel.RRDData{}
resp.Endpoint = param.Endpoint
resp.Counter = param.Counter
dsType, step, exists := index.GetTypeAndStep(param.Endpoint, param.Counter) // complete dsType and step
if !exists {
return nil
}
resp.DsType = dsType //类型
resp.Step = step //数据周期
//启始时间与结束时间
start_ts := param.Start - param.Start%int64(step)
end_ts := param.End - param.End%int64(step) + int64(step)
if end_ts-start_ts-int64(step) < 1 {
return nil
}
md5 := cutils.Md5(param.Endpoint + "/" + param.Counter)
key := g.FormRrdCacheKey(md5, dsType, step)
filename := g.RrdFileName(cfg.RRD.Storage, md5, dsType, step)
//读取缓存项
items, flag := store.GraphItems.FetchAll(key)
items_size := len(items)
//构建网络任务NET_TASK_M_QUERY
if cfg.Migrate.Enabled && flag&g.GRAPH_F_MISS != 0 {
node, _ := rrdtool.Consistent.Get(param.Endpoint + "/" + param.Counter) //获取Node节点
done := make(chan error, 1)
res := &cmodel.GraphAccurateQueryResponse{}
rrdtool.Net_task_ch[node] <- &rrdtool.Net_task_t{
Method: rrdtool.NET_TASK_M_QUERY,
Done: done,
Args: param,
Reply: res,
}
<-done
// fetch data from remote
datas = res.Values
datas_size = len(datas)
} else {
// read data from rrd file
// 从RRD中获取数据不包含起始时间点
// 例: start_ts=1484651400,step=60,则第一个数据时间为1484651460)
datas, _ = rrdtool.Fetch(filename, md5, param.ConsolFun, start_ts-int64(step), end_ts, step) //Fetch获取指定数据
datas_size = len(datas)
}
nowTs := time.Now().Unix()
lastUpTs := nowTs - nowTs%int64(step)
rra1StartTs := lastUpTs - int64(rrdtool.RRA1PointCnt*step)
// consolidated, do not merge
if start_ts < rra1StartTs {
resp.Values = datas
goto _RETURN_OK
}
// no cached items, do not merge
if items_size < 1 {
resp.Values = datas
goto _RETURN_OK
}
// merge 合并数据操作
{
// fmt cached items
var val cmodel.JsonFloat
cache := make([]*cmodel.RRDData, 0)
ts := items[0].Timestamp
itemEndTs := items[items_size-1].Timestamp
itemIdx := 0
if dsType == g.DERIVE || dsType == g.COUNTER {
for ts < itemEndTs {
if itemIdx < items_size-1 && ts == items[itemIdx].Timestamp {
if ts == items[itemIdx+1].Timestamp-int64(step) && items[itemIdx+1].Value >= items[itemIdx].Value {
val = cmodel.JsonFloat(items[itemIdx+1].Value-items[itemIdx].Value) / cmodel.JsonFloat(step)
} else {
val = cmodel.JsonFloat(math.NaN())
}
itemIdx++
} else {
// missing
val = cmodel.JsonFloat(math.NaN())
}
if ts >= start_ts && ts <= end_ts {
cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val})
}
ts = ts + int64(step)
}
} else if dsType == g.GAUGE {
for ts <= itemEndTs {
if itemIdx < items_size && ts == items[itemIdx].Timestamp {
val = cmodel.JsonFloat(items[itemIdx].Value)
itemIdx++
} else {
// missing
val = cmodel.JsonFloat(math.NaN())
}
if ts >= start_ts && ts <= end_ts {
cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val})
}
ts = ts + int64(step)
}
}
cache_size := len(cache)
// do merging
merged := make([]*cmodel.RRDData, 0)
if datas_size > 0 {
for _, val := range datas {
if val.Timestamp >= start_ts && val.Timestamp <= end_ts {
merged = append(merged, val) //rrdtool返回的数据,时间戳是连续的、不会有跳点的情况
}
}
}
if cache_size > 0 {
rrdDataSize := len(merged)
lastTs := cache[0].Timestamp
// find junction
rrdDataIdx := 0
for rrdDataIdx = rrdDataSize - 1; rrdDataIdx >= 0; rrdDataIdx-- {
if merged[rrdDataIdx].Timestamp < cache[0].Timestamp {
lastTs = merged[rrdDataIdx].Timestamp
break
}
}
// fix missing
for ts := lastTs + int64(step); ts < cache[0].Timestamp; ts += int64(step) {
merged = append(merged, &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())})
}
// merge cached items to result
rrdDataIdx += 1
for cacheIdx := 0; cacheIdx < cache_size; cacheIdx++ {
if rrdDataIdx < rrdDataSize {
if !math.IsNaN(float64(cache[cacheIdx].Value)) {
merged[rrdDataIdx] = cache[cacheIdx]
}
} else {
merged = append(merged, cache[cacheIdx])
}
rrdDataIdx++
}
}
mergedSize := len(merged)
// fmt result
ret_size := int((end_ts - start_ts) / int64(step))
if dsType == g.GAUGE {
ret_size += 1
}
ret := make([]*cmodel.RRDData, ret_size, ret_size)
mergedIdx := 0
ts = start_ts
for i := 0; i < ret_size; i++ {
if mergedIdx < mergedSize && ts == merged[mergedIdx].Timestamp {
ret[i] = merged[mergedIdx]
mergedIdx++
} else {
ret[i] = &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())}
}
ts += int64(step)
}
resp.Values = ret
}
_RETURN_OK:
//统计
proc.GraphQueryItemCnt.IncrBy(int64(len(resp.Values)))
return nil
}
- RPC -> graph.Delete 删除rrd ↗
//从内存索引、MySQL中删除counter,并从磁盘上删除对应rrd文件
func (this *Graph) Delete(params []*cmodel.GraphDeleteParam, resp *cmodel.GraphDeleteResp) error {
resp = &cmodel.GraphDeleteResp{}
for _, param := range params {
err, tags := cutils.SplitTagsString(param.Tags)
if err != nil {
log.Error("invalid tags:", param.Tags, "error:", err)
continue
}
var item *cmodel.GraphItem = &cmodel.GraphItem{
Endpoint: param.Endpoint,
Metric: param.Metric,
Tags: tags,
DsType: param.DsType,
Step: param.Step,
}
index.RemoveItem(item) //删除
}
return nil
}
//从graph cache中删除掉某个item, 并删除指定的counter对应的rrd文件
func RemoveItem(item *cmodel.GraphItem) {
md5 := item.Checksum()
IndexedItemCache.Remove(md5) //删除索引
unIndexedItemCache.Remove(md5)
//discard data of memory
checksum := item.Checksum()
key := g.FormRrdCacheKey(checksum, item.DsType, item.Step)
poped_items := store.GraphItems.PopAll(key) //POP内存缓存
log.Debugf("discard data of item:%v, size:%d", item, len(poped_items))
rrdFileName := g.RrdFileName(g.Config().RRD.Storage, md5, item.DsType, item.Step)
file.Remove(rrdFileName) //删除RRD文件
log.Debug("remove rrdfile:", rrdFileName)
}
- RPC -> graph.GetRrd 获取rrd数据 ↗
func (this *Graph) GetRrd(key string, rrdfile *g.File) (err error) {
var (
md5 string
dsType string
step int
)
if md5, dsType, step, err = g.SplitRrdCacheKey(key); err != nil {
return err
} else {
rrdfile.Filename = g.RrdFileName(g.Config().RRD.Storage, md5, dsType, step)
}
// 读取RRD文件之前有将内存刷入文件的过程,保障数据完整
items := store.GraphItems.PopAll(key) //POP缓存数据
if len(items) > 0 {
rrdtool.FlushFile(rrdfile.Filename, md5, items) //存入文件
}
rrdfile.Body, err = rrdtool.ReadFile(rrdfile.Filename, md5) //读取文件数据
return
}
# 创建io任务,读取RRD文件数据
func ReadFile(filename, md5 string) ([]byte, error) {
done := make(chan error, 1)
task := &io_task_t{ //构造IO Read任务
method: IO_TASK_M_READ,
args: &readfile_t{filename: filename},
done: done,
}
io_task_chans[getIndex(md5)] <- task
err := <-done
return task.args.(*readfile_t).data, err
}
# 创建io任务,缓存数据刷入RRD文件
func FlushFile(filename, md5 string, items []*cmodel.GraphItem) error {
done := make(chan error, 1)
io_task_chans[getIndex(md5)] <- &io_task_t{
method: IO_TASK_M_FLUSH, //构造IO FLush任务
args: &flushfile_t{
filename: filename,
items: items,
},
done: done,
}
atomic.AddUint64(&disk_counter, 1)
return <-done
}
- RPC -> graph.Info RRD基础信息 ↗
func (this *Graph) Info(param cmodel.GraphInfoParam, resp *cmodel.GraphInfoResp) error {
// 统计
proc.GraphInfoCnt.Incr()
dsType, step, exists := index.GetTypeAndStep(param.Endpoint, param.Counter) //索引查询(sql)
if !exists {
return nil
}
md5 := cutils.Md5(param.Endpoint + "/" + param.Counter)
filename := fmt.Sprintf("%s/%s/%s_%s_%d.rrd", g.Config().RRD.Storage, md5[0:2], md5, dsType, step)
resp.ConsolFun = dsType //类型
resp.Step = step //数据周期
resp.Filename = filename //文件路径和名称
return nil
}
- RPC -> graph.Last最后一次信息 ↗
func (this *Graph) Last(param cmodel.GraphLastParam, resp *cmodel.GraphLastResp) error {
// statistics
proc.GraphLastCnt.Incr()
resp.Endpoint = param.Endpoint
resp.Counter = param.Counter
resp.Value = GetLast(param.Endpoint, param.Counter) //获取最后一次RRD数据值
return nil
}
// 非法值: ts=0,value无意义
func GetLast(endpoint, counter string) *cmodel.RRDData {
dsType, step, exists := index.GetTypeAndStep(endpoint, counter)
if !exists {
return cmodel.NewRRDData(0, 0.0)
}
if dsType == g.GAUGE {
return GetLastRaw(endpoint, counter)
}
if dsType == g.COUNTER || dsType == g.DERIVE {
md5 := cutils.Md5(endpoint + "/" + counter)
items := store.GetAllItems(md5)
if len(items) < 2 {
return cmodel.NewRRDData(0, 0.0)
}
f0 := items[0]
f1 := items[1]
//时差和值差
delta_ts := f0.Timestamp - f1.Timestamp
delta_v := f0.Value - f1.Value
if delta_ts != int64(step) || delta_ts <= 0 {
return cmodel.NewRRDData(0, 0.0)
}
if delta_v < 0 {
// when cnt restarted, new cnt value would be zero, so fix it here
delta_v = 0
}
return cmodel.NewRRDData(f0.Timestamp, delta_v/float64(delta_ts)) // 差值/时间差
}
return cmodel.NewRRDData(0, 0.0)
}
- RPC -> graph.LastRaw最后一次原始数据 ↗
func (this *Graph) LastRaw(param cmodel.GraphLastParam, resp *cmodel.GraphLastResp) error {
// statistics
proc.GraphLastRawCnt.Incr()
resp.Endpoint = param.Endpoint
resp.Counter = param.Counter
resp.Value = GetLastRaw(param.Endpoint, param.Counter)
return nil
}
// 非法值: ts=0,value无意义
func GetLastRaw(endpoint, counter string) *cmodel.RRDData {
md5 := cutils.Md5(endpoint + "/" + counter)
item := store.GetLastItem(md5)
return cmodel.NewRRDData(item.Timestamp, item.Value) //获取原始RRD值
}
HTTP API服务分析
http.Start() ↗
func init() {
router = gin.Default()
configCommonRoutes() //公共接口
configProcRoutes() //统计接口
configIndexRoutes() //索引操作系统
Close_chan = make(chan int, 1)
Close_done_chan = make(chan int, 1)
}
func Start() {
if !g.Config().Http.Enabled {
log.Println("http.Start warning, not enabled")
return
}
//查询迁移信息
router.GET("/api/v2/counter/migrate", func(c *gin.Context) {
counter := rrdtool.GetCounterV2()
log.Debug("migrating counter v2:", fmt.Sprintf("%+v", counter))
c.JSON(200, counter)
})
//compatible with open-falcon v0.1
router.GET("/counter/migrate", func(c *gin.Context) {
cnt := rrdtool.GetCounter()
log.Debug("migrating counter:", cnt)
c.JSON(200, gin.H{"msg": "ok", "counter": cnt})
})
addr := g.Config().Http.Listen
if addr == "" {
return
}
go router.Run(addr)
select {
case <-Close_chan:
log.Info("http recv sigout and exit...")
Close_done_chan <- 1
return
}
}
# 公共接口
func configCommonRoutes() {
//服务健康状态
router.GET("/health", func(c *gin.Context) {..})
//接口健康状态
router.GET("/api/v2/health", func(c *gin.Context) {..})
//版本信息
router.GET("/api/v2/version", func(c *gin.Context) {...})
//工作目录
router.GET("/api/v2/workdir", func(c *gin.Context) {...})
//查看全局配置信息
router.GET("/api/v2/config", func(c *gin.Context) {...})
//重载全局配置
router.POST("/api/v2/config/reload", func(c *gin.Context) {...})
//队列长度信息
router.GET("/api/v2/stats/graph-queue-size", func(c *gin.Context) {...}
}
# 统计接口
func configProcRoutes() {
// counter
router.GET("/counter/all", func(c *gin.Context) {...})
// compatible with falcon task monitor
router.GET("/statistics/all", func(c *gin.Context) {...})
}
# 索引接口
func configIndexRoutes() {
// 触发索引全量更新, 同步操作
router.GET("/index/updateAll", func(c *gin.Context) {...})
// 获取索引全量更新的并行数
router.GET("/index/updateAll/concurrent", func(c *gin.Context) {...}
// 更新一条索引数据,用于手动建立索引 endpoint metric step dstype tags
router.POST("/api/v2/index", func(c *gin.Context) {...}
扩展学习
经验借鉴 ↗
- 优雅的关闭系统代码实现;
- 熟练的运用数据结构list、map与并发安全机制sync互斥锁结合场景如SafeListLimited、SafeLinkedList、GraphItemMap、SafeMap;
- rrd存储、查询、删除等操作实现。
- RPC Send数据处理(内存缓存、RRD存储、Index索引)
- 集群数据迁移的处理与实现
扩展学习 ↗
- github.com/yubo/rrdlite rrd for golang
- github.com/toolkits/cache/localcache/timedcache 带过期时间内存Cache实现