Kubersphere Executor 模块源码学习总结

2021-03-09  本文已影响0人  七齐起器
整体框架图.png

Executor总述:

Executor 是对告警进行增删查改和触发告警的最小执行单元;

Executor 构成:

整体由功能划分为四个模块

Executor监控模块

从 /cmd/alert/main.go 的 main 函数开始

/cmd/alert/main.go

func mainFuncExecutor() {

    host, err := os.Hostname()
    if err != nil {
        logger.Error(nil, "Get Host Name error: %s", err)
        return
    }

    e = executor.Init(host)

    //if e.CheckExist() {
    //  logger.Error(nil, "Executor ID already used, exiting...")
    //  syscall.Kill(os.Getpid(), syscall.SIGHUP)
    //}
    e.HeartBoot()
    e.Serve()
}

func main() {
    exitHandler()
    cfg := config.GetInstance().LoadConf()
    switch cfg.App.RunMode {
    case "executor":
        mainFuncExecutor()
    case "watcher":
        mainFuncWatcher()
    case "manager":
        mainFuncManager()
    case "client":
        mainFuncClient()
    default:
        logger.Error(nil, "Run mode error, exiting...")
    }
}

/pkg/services/executor/executor.go

func Init(name string) *Executor {
    alertReceiver := NewAlertReceiver()
    aliveReporter := NewAliveReporter()
    broadcastReceiver := NewBroadcastReceiver()
    healthChecker := NewHealthChecker()
    executor := NewExecutor(name, alertReceiver, aliveReporter, broadcastReceiver, healthChecker)

    alertReceiver.SetExecutor(executor)
    aliveReporter.SetExecutor(executor)
    broadcastReceiver.SetExecutor(executor)
    healthChecker.SetExecutor(executor)

    return executor
}

func (e *Executor) HeartBoot() {
    e.aliveReporter.HeartBoot()
}

func (e *Executor) Serve() {
    go e.alertReceiver.Serve()
    go e.broadcastReceiver.WatchBroadcast()
    go e.healthChecker.HealthCheck()
    e.aliveReporter.HeartBeat()
}

/pkg/services/executor/alive_reporter.go

type AliveReporter struct {
    executor *Executor
}

func NewAliveReporter() *AliveReporter {
    ar := &AliveReporter{}

    return ar
}

func (ar *AliveReporter) SetExecutor(executor *Executor) {
    ar.executor = executor
}

func (ar *AliveReporter) putKey(expireTime int64) error {
    ctx := context.Background()
    e := global.GetInstance().GetEtcd()

    key := "alert-executors/" + ar.executor.GetName()

    info := &ExecutorInfo{
        Name:      ar.executor.GetName(),
        TaskCount: ar.executor.GetTaskCount(),
    }

    value, _ := json.Marshal(info)

    resp, err := e.Grant(ctx, expireTime)
    if err != nil {
        logger.Error(nil, "Grant TTL from etcd failed: %+v", err)
        return err
    }

    _, err = e.Put(ctx, key, string(value), clientv3.WithLease(resp.ID))

    if err != nil {
        logger.Error(nil, "AliveReporter putKey [%s] [%s] to etcd failed: %+v", key, string(value), err)
        return err
    }

    return err
}

func (ar *AliveReporter) HeartBoot() {
    err := ar.putKey(30)

    if err != nil {
        logger.Error(nil, "AliveReporter HeartBoot failed: %+v", err)
    }
}
上一篇 下一篇

猜你喜欢

热点阅读