falcon agent模块源码解析

2019-08-27  本文已影响0人  蒋植玉

导读

falcon是小米开源的监控平台,广泛用于许多互联网公司。agent模块是metric采集服务,它就像搬运工,将各种metric输送到transfer服务。如果说这些是它的主要工作,那么插件安装,同步信任ip名单等便是它的副业。本着抓主要矛盾的原则,本文不包含副业解读。

main函数

main函数主要做一些初始化工作,并启动http服务。

 func main() {
      //入参cfg := flag.String("c","cfg.json","configuration file")
        version := flag.Bool("v", false,"show version")
        check := flag.Bool("check", false,"check collector")
        //入参解析
      flag.Parse()
      if *version {
          fmt.Println(g.VERSION)
         os.Exit(0)
}
//check collect功能是否正常
if *check {
    funcs.CheckCollector()
    os.Exit(0)
}
//解析配置
g.ParseConfig(*cfg)
if g.Config().Debug {
g.InitLog("debug")
}else {
g.InitLog("info")
}

//初始化工作目录
g.InitRootDir()
//初始化本地ip,通过向heartbeat服务发起一次连接获取本地ip
g.InitLocalIp()
//初始化rpc client,new一个结构体出来
g.InitRpcClients()
//初始化一系列Mapper,内含采集metric的各种函数
funcs.BuildMappers()
//初始化更新cpu 硬盘统计协程
go cron.InitDataHistory()
//启动同步agent状态定时器
cron.ReportAgentStatus()
//启动同步插件定时器,插件安装异步实现
cron.SyncMinePlugins()
//启动配置的buildin  metric采集项定时器
cron.SyncBuiltinMetrics()
//启动同步信任ip定时器
cron.SyncTrustableIps()
//启动采集metric定时器
cron.Collect()
//启动http服务
go http.Start()
//阻塞
select {}
}

长连接

为了回避创建连接销毁连接的开销,agent与transfer通过长连接通信。调用下面的call函数上报metric到transfer。

func (this *SingleConnRpcClient) Call(method string, argsinterface{}, replyinterface{}) error {
    //上锁,协程安全
    this.Lock()
    defer this.Unlock()
    / /长连接生成函数
      err := this.serverConn()
      if err != nil {
        return err
}
timeout := time.Duration(10 * time.Second)
done := make(chan error,1)
go func() {
//发起调用
err := this.rpcClient.Call(method, args, reply)
done <- err
}()
select {
case <-time.After(timeout):
//超时,连接可能已经不可用,需要关闭连接
log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
this.close()
return errors.New(this.RpcServer +" rpc call timeout")
case err := <-done:
if err != nil {
//如果出错也关闭连接,不管什么错误,有点粗暴
this.close()
return err
  }
}
return nil
}

生成连接。

func (this *SingleConnRpcClient) serverConn() error {
if this.rpcClient != nil {
return nil
}
var err error
var retry int =1
      for {
//出错的时候,调用close函数会把rpcClient 置为nil,从而触发重建连接
if this.rpcClient != nil {
     return nil
}
//发起连接
this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
if err != nil {
    log.Printf("dial %s fail: %v", this.RpcServer, err)
    if retry >3 {
        return err
}
//重试3次,每次休眠2的retry幂次方
time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
retry++
continue
      }
return err
}
}

metric采集上传

采集metric主要入口

func Collect() {

//开关检测

if !g.Config().Transfer.Enabled {
      return  }
//没有配置transfer地址
if len(g.Config().Transfer.Addrs) ==0 {
return  
 }

//开始执行各个采集函数,funcs.Mappers是在main函数里初始化好的

for _, v :=range funcs.Mappers {
go collect(int64(v.Interval), v.Fs)
}
}

具体调用采集函数的入口

func collect(sec int64, fns []func() []*model.MetricValue) {
t := time.NewTicker(time.Second * time.Duration(sec))
defer t.Stop()
for {
<-t.C
hostname, err := g.Hostname()
if err != nil {
continue
              }
mvs := []*model.MetricValue{}
ignoreMetrics := g.Config().IgnoreMetrics
for _, fn :=range fns {
//调用采集函数
items := fn()
if items == nil {
continue
 }
if len(items) ==0 {
        continue
}

for _, mv :=range items {
  if b, ok := ignoreMetrics[mv.Metric]; ok && b {
      continue
 }else {
mvs = append(mvs, mv)
}
}
}
now := time.Now().Unix()
for j :=0; j < len(mvs); j++ {
mvs[j].Step = sec
mvs[j].Endpoint = hostname
mvs[j].Timestamp = now
}
//通过长连接将metrics send到transfer
g.SendToTransfer(mvs)
}
}

以上是agent采集metric的主要流程。各个采集功能函数都在funcs.Mappers里面,如果需要crud采集功能,只需修改funcs.Mappers相关逻辑即可。用一个采集内存的功能函数做例,其他的异曲同工。

//返回值连同其他采集结果调用SendToTransfer发送到transfer
func MemMetrics() []*model.MetricValue {
//获取内存信息
m, err := nux.MemInfo()
if err != nil {
log.Println(err)
return nil
}
memFree := m.MemFree + m.Buffers + m.Cached
if m.MemAvailable >0 {
memFree = m.MemAvailable
}
memUsed := m.MemTotal - memFree
pmemFree :=0.0
      pmemUsed :=0.0
      if m.MemTotal !=0 {
pmemFree = float64(memFree) *100.0 / float64(m.MemTotal)
pmemUsed = float64(memUsed) *100.0 / float64(m.MemTotal)
}
pswapFree :=0.0
      pswapUsed :=0.0
      if m.SwapTotal !=0 {
pswapFree = float64(m.SwapFree) *100.0 / float64(m.SwapTotal)
pswapUsed = float64(m.SwapUsed) *100.0 / float64(m.SwapTotal)
}
return []*model.MetricValue{
//总内存
              GaugeValue("mem.memtotal", m.MemTotal),
//used的内存
              GaugeValue("mem.memused", memUsed),
//free的内存
              GaugeValue("mem.memfree", memFree),
//swap总内存
              GaugeValue("mem.swaptotal", m.SwapTotal),
//swap used内存
              GaugeValue("mem.swapused", m.SwapUsed),
//swap free内存
              GaugeValue("mem.swapfree", m.SwapFree),
//内存free比例
              GaugeValue("mem.memfree.percent", pmemFree),
//内存使用比例
              GaugeValue("mem.memused.percent", pmemUsed),
//swap free比例
              GaugeValue("mem.swapfree.percent", pswapFree),
//swap used比例
              GaugeValue("mem.swapused.percent", pswapUsed),
      }
}

SendToTransfer函数核心还是调用SendMetrics函数

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
rand.Seed(time.Now().UnixNano())
//随机选取transfer节点,负载均衡
for _, i :=range rand.Perm(len(Config().Transfer.Addrs)) {
addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}

//在这个函数里面会调用长连接那一节的call函数,来初始化或者复用连接

if updateMetrics(c, metrics, resp) {
break
              }
}
}

上面说的是agent自己采集的metric,用户也可以调用push接口主动上传。比如mysql,mongdb等这些外部的进程的采集。这是push接口的主要实现,

func configPushRoutes() {
http.HandleFunc("/v1/push",func(w http.ResponseWriter, req *http.Request) {
    if req.ContentLength ==0 {
        http.Error(w,"body is blank", http.StatusBadRequest)
        return
       }
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics)
if err != nil {
          http.Error(w,"connot decode body", http.StatusBadRequest)
return
         }

//同样调用的SendToTransfer
g.SendToTransfer(metrics)
w.Write([]byte("success"))
})
}

结束

纸上得来终觉浅,绝知此事要躬行。

上一篇下一篇

猜你喜欢

热点阅读