Kubersphere Executor 模块 Alert Ru
2021-03-10 本文已影响0人
七齐起器

Executor 模块中调用 alertReceiver.serve(),Alert Runner 就开始工作;它做了以下事情:
- 首先,使用 ar.ExtractAlerts(),将 Alert 信息全部取出放到队列 runningAlertIds 中;
- 从队列中取出 一个 Alert,通过ar.HandleAlert 调用 ar.executor.AddAlert 来添加新的Alert ;
- 可以从 executor 中一个 AddAlert 会调用一个 startRunner ;runner 实际上是一个互斥锁;
- startRunner 它先检查 alert 的存在性,若不存在则进行创建 alert ,创建完毕以后,调用 runner.Run(initStatus) 启动 runner ,同时更新 alert 状态为 running;
/pkg/services/executor/alert_receiver.go
type AlertReceiver struct {
alertQueue lib.Topic
runningAlertIds chan string
executor *Executor
}
func NewAlertReceiver() *AlertReceiver {
cfg := config.GetInstance()
queueConnStr := cfg.Queue.Addr
queueType := cfg.Queue.Type
queueConfigMap := map[string]interface{}{
"connStr": queueConnStr,
}
var alertQueue lib.Topic
c, err := q.New(queueType, queueConfigMap)
if err != nil {
logger.Error(nil, "Failed to connect redis queue: %+v.", err)
}
alertQueue, _ = c.SetTopic(constants.AlertTopicPrefix)
return &AlertReceiver{
alertQueue: alertQueue,
runningAlertIds: make(chan string, 1000),
}
}
func (ar *AlertReceiver) SetExecutor(executor *Executor) {
ar.executor = executor
}
func (ar *AlertReceiver) Serve() {
go ar.ExtractAlerts()
for i := 0; i < constants.MaxWorkingAlerts; i++ {
go ar.HandleAlert(strconv.Itoa(i))
}
}
func (ar *AlertReceiver) ExtractAlerts() error {
if ar.alertQueue == nil {
return errors.New("AlertQueue not initialized")
}
for {
alertId, err := ar.alertQueue.Dequeue()
if err != nil {
logger.Error(nil, "AlertReceiver failed to test for debug")
logger.Error(nil, "AlertReceiver failed to dequeue alert from etcd queue: %+v", err)
time.Sleep(3 * time.Second)
continue
}
logger.Debug(nil, "AlertReceiver dequeue alert [%s] from etcd queue succeed", alertId)
ar.runningAlertIds <- alertId
}
}
func (ar *AlertReceiver) HandleAlert(handlerNum string) {
for {
alertId := <-ar.runningAlertIds
logger.Debug(nil, "AlertReceiver handle alert [%s] from etcd queue", alertId)
ar.executor.AddAlert(alertId)
}
}
/pkg/services/executor/executor.go
type Runner struct {
sync.RWMutex
Map map[string]*AlertRunner
}
func NewExecutor(name string, alertReceiver *AlertReceiver, aliveReporter *AliveReporter, broadcastReceiver *BroadcastReceiver, healthChecker *HealthChecker) *Executor {
e := &Executor{
name: name,
alertReceiver: alertReceiver,
runner: &Runner{Map: make(map[string]*AlertRunner)},
aliveReporter: aliveReporter,
broadcastReceiver: broadcastReceiver,
healthChecker: healthChecker,
}
return e
}
func (e *Executor) AddAlert(alertId string) {
e.startRunner(alertId)
}
func (e *Executor) startRunner(alertId string) bool {
e.runner.Lock()
_, ok := e.runner.Map[alertId]
e.runner.Unlock()
if ok {
logger.Error(nil, "Executor startRunner error: alert %s already exists", alertId)
return false
}
//Query DB, check if this alert is in adding or migrating state
alert := rs.GetAlertInfo(alertId)
if !((alert.RunningStatus == "adding" || alert.RunningStatus == "migrating") && alert.ExecutorId == "") {
logger.Error(nil, "Executor startRunner error: alert %s should not be dispatched", alertId)
return false
}
initStatus := alert.RunningStatus
//Update DB, set this alert to running
err := rs.UpdateAlertInfo(alertId, e.name, "running")
if err != nil {
logger.Error(nil, "Executor startRunner error: update alert "+alertId+" error")
return false
}
var runner = NewAlertRunner(alertId, e.healthChecker.UpdateCh)
e.runner.Lock()
e.runner.Map[alertId] = runner
e.runner.Unlock()
go runner.Run(initStatus)
logger.Debug(nil, "Executor startRunner "+alertId+" success")
return true
}
/pkg/services/executor/runner.go
func (ar *AlertRunner) loadAlertInfo() {
alertDetail := rs.QueryAlertDetail(ar.AlertConfig.AlertId)
//1. Parse Resource
ar.AlertConfig.RsTypeName = alertDetail.RsTypeName
ar.AlertConfig.RsTypeParam = alertDetail.RsTypeParam
ar.AlertConfig.RsFilterName = alertDetail.RsFilterName
ar.AlertConfig.RsFilterParam = alertDetail.RsFilterParam
//2. Parse Notification
ar.parseNotification(alertDetail.NfAddressListId)
//3. Parse policy config
ar.parsePolicyConfig(alertDetail)
//4. Parse Rules config
ar.parseRules()
//5. Parse Alert status
ar.parseAlertConfigStatus(alertDetail)
logger.Debug(nil, "loadAlertInfo alert: %v", ar)
}
func (ar *AlertRunner) Run(initStatus string) {
timer := time.NewTicker(time.Second * TickPeriodSecond)
defer timer.Stop()
ar.loadAlertInfo()
ar.updateAlertUpdateTime()
//If get alert from migrating, continue running with current status, only need to reset status when adding and updating
if initStatus == "adding" {
ar.AlertStatus.Lock()
ar.resetAlertStatus()
ar.AlertStatus.Unlock()
}
for {
select {
case <-timer.C:
ar.runAlertRules()
logger.Debug(nil, "AlertRunner alert %s run", ar.AlertConfig.AlertId)
ar.updateAlertUpdateTime()
case operation := <-ar.SignalCh:
switch operation {
case "Stop":
//Drain SignalCh
for len(ar.SignalCh) > 0 {
<-ar.SignalCh
}
logger.Debug(nil, "AlertRunner alert %s stop", ar.AlertConfig.AlertId)
return
case "Update":
ar.loadAlertInfo()
ar.AlertStatus.Lock()
ar.resetAlertStatus()
ar.AlertStatus.Unlock()
ar.updateAlertUpdateTime()
logger.Debug(nil, "AlertRunner alert %s update", ar.AlertConfig.AlertId)
default:
param := strings.Split(operation, " ")
if len(param) != 2 {
break
}
switch param[0] {
case "Comment":
ar.commentAlert(param[1])
ar.updateAlertUpdateTime()
logger.Debug(nil, "AlertRunner alert %s comment", ar.AlertConfig.AlertId)
}
}
}
}
}