OpenFalcon源码分析(Alarm组件)

2018-09-23  本文已影响0人  Xiao_Yang

Alarm版本

VERSION = "0.2.0"

Alarm组件功能

judge把报警event写入redis,alarm从redis读取event,作相应处理 ( 发报警短信、邮件或callback http地址)。

Alarm组件逻辑图

alarm逻辑图

Alarm配置操作

Alt text

main主入口分析

func main() {
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    help := flag.Bool("h", false, "help")
    flag.Parse()

    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }

    if *help {
        flag.Usage()
        os.Exit(0)
    }

    g.ParseConfig(*cfg)  //全局配置文件解析 【参考详细分析】

    g.InitLog(g.Config().LogLevel)
    if g.Config().LogLevel != "debug" {
        gin.SetMode(gin.ReleaseMode)
    }

    g.InitRedisConnPool() //初始化Redis连接池(同参考Judge模块分析)
    model.InitDatabase() //初始化数据库ORM
    cron.InitSenderWorker() //初始化发送Channel

    go http.Start()         //http API服务监听与处理 【参考详细分析】
    go cron.ReadHighEvent() //处理高优先级事件队列 【参考详细分析】
    go cron.ReadLowEvent()  //处理低优先级事件队列 【参考详细分析】
    
    go cron.CombineSms()    //合并SMS内容 【参考详细分析】
    go cron.CombineMail()   //合并MAIL内容 【参考详细分析】
    go cron.CombineIM()     //合并IM内容 【参考详细分析】
    
    go cron.ConsumeIM()     //发送事件IM   【参考详细分析】
    go cron.ConsumeSms()    //发送事件SMS  【参考详细分析】
    go cron.ConsumeMail()   //发送事件Mail 【参考详细分析】
    
    go cron.CleanExpiredEvent() //清理过期事件信息 【参考详细分析】

    // 注册系统信号syscall.SIGTERM,退出释放资源
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigs
        fmt.Println()
        g.RedisConnPool.Close()
        os.Exit(0)
    }()

    select {}   
}

g.ParseConfig(*cfg) 全局配置文件解析与其它模块相关(参考Judge模块)

type GlobalConfig struct {
    LogLevel     string              `json:"log_level"`
    FalconPortal *FalconPortalConfig `json:"falcon_portal"`
    Http         *HttpConfig         `json:"http"`
    Redis        *RedisConfig        `json:"redis"`
    Api          *ApiConfig          `json:"api"`
    Worker       *WorkerConfig       `json:"worker"`
    Housekeeper  *HousekeeperConfig  `json:"Housekeeper"`
}

model.InitDatabase() 初始化数据库ORM


func InitDatabase() {
    // set default database
    config := g.Config()
    //注册database驱动名为mysql
    orm.RegisterDataBase("default", "mysql", config.FalconPortal.Addr, config.FalconPortal.Idle, config.FalconPortal.Max)
    // register model  注册对象关系映射event.Events/event.EventCases
    orm.RegisterModel(new(event.Events), new(event.EventCases))
    if config.LogLevel == "debug" {
        orm.Debug = true
    }
}

type Events struct {
    Id          int         `json:"id" orm:"pk"`
    Step        int         `json:"step"`
    Cond        string      `json:"cond"`
    Status      int         `json:"status"`
    Timestamp   time.Time   `json:"timestamp"`
    EventCaseId *EventCases `json:"event_caseId" orm:"rel(fk)"`
}

type EventCases struct {
    // uniuq
    Id       string `json:"id" orm:"pk"`
    Endpoint string `json:"endpoint"`
    Metric   string `json:"metric"`
    Func     string `json:"func"`
    //leftValue + operator + rightValue
    Cond          string    `json:"cond"`
    Note          string    `json:"note"`
    MaxStep       int       `json:"max_step"`
    CurrentStep   int       `json:"current_step"`
    Priority      int       `json:"priority"`
    Status        string    `json:"status"`
    Timestamp     time.Time `json:"start_at"`
    UpdateAt      time.Time `json:"update_at"`
    ProcessNote   int       `json:"process_note"`
    ProcessStatus string    `json:"process_status"`
    TplCreator    string    `json:"tpl_creator"`
    ExpressionId  int       `json:"expression_id"`
    StrategyId    int       `json:"strategy_id"`
    TemplateId    int       `json:"template_id"`
    Events        []*Events `json:"evevnts" orm:"reverse(many)"`
}

cron.InitSenderWorker() 初始化发送Channel

func InitSenderWorker() {
    workerConfig := g.Config().Worker
    IMWorkerChan = make(chan int, workerConfig.IM)
    SmsWorkerChan = make(chan int, workerConfig.Sms)
    MailWorkerChan = make(chan int, workerConfig.Mail)
}

http.Start() http API服务监听与处理

func Start() {
    if !g.Config().Http.Enabled {
        return
    }
    addr := g.Config().Http.Listen
    if addr == "" {
        return
    }

    r := gin.Default()
    r.GET("/version", Version)    //版本信息
    r.GET("/health", Health)      //服务健康状态
    r.GET("/workdir", Workdir)    //工作目录
    r.Run(addr)

    log.Println("http listening", addr)
}

func Version(c *gin.Context) {
    c.String(200, g.VERSION)
}

func Health(c *gin.Context) {
    c.String(200, "ok")
}

func Workdir(c *gin.Context) {
    c.String(200, file.SelfDir())
}

cron.ReadHighEvent() 处理高优先级事件队列


#highQueues中配置的几个event队列中的事件是不会做报警合并的,因为那些是高优先级的报警,报警合并只是针对lowQueues中的事件。如果所有的事件都不想做报警合并,就把所有的event队列都配置到highQueues中即可

#我们在配置报警策略的时候配置了报警级别,比如P0/P1/P2等等,每个及别的报警都会对应不同的redis队列 alarm去读取这个数据的时候我们希望先读取P0的数据,再读取P1的数据,最后读取P5的数据,因为我们希望先处理优先级高的。于是:用了redis的brpop指令

# 从Redis读取高优先级队列事件信息
# 解析事件信息(action/callback/teams/user:phone、im、mail等)
# 根据事件信息生成IM、SMS、MAIL内容存入Redis队列
func ReadHighEvent() {
    queues := g.Config().Redis.HighQueues //默认event:p0、p1、p2
    if len(queues) == 0 {
        return
    }

    for {
        event, err := popEvent(queues)  //事件出列
        if err != nil {
            time.Sleep(time.Second)
            continue
        }
        consume(event, true)   //处理事件
    }
}

## 事件出列并将处理事件入库保存
func popEvent(queues []string) (*cmodel.Event, error) {

    count := len(queues)

    params := make([]interface{}, count+1)
    for i := 0; i < count; i++ {
        params[i] = queues[i]
    }
    // set timeout 0
    params[count] = 0

    rc := g.RedisConnPool.Get()    //redis客户端连接池 
    defer rc.Close()

    reply, err := redis.Strings(rc.Do("BRPOP", params...)) //阻塞式POP多队列(keys)事件
    if err != nil {
        log.Errorf("get alarm event from redis fail: %v", err)
        return nil, err
    }

    var event cmodel.Event
    err = json.Unmarshal([]byte(reply[1]), &event) //反序列JSON为event结构
    if err != nil {
        log.Errorf("parse alarm event fail: %v", err)
        return nil, err
    }

    log.Debugf("pop event: %s", event.String())

    //插入到Mysql database,解析完的事件不再保留在内存中
    eventmodel.InsertEvent(&event)   

    return &event, nil
}

## 处理事件 
## 
func consume(event *cmodel.Event, isHigh bool) {
    actionId := event.ActionId()  //事件触发执行动作ID
    if actionId <= 0 {
        return
    }

    action := api.GetAction(actionId)  //执行动作ID查询API组件获取Action
    if action == nil {
        return
    }

    if action.Callback == 1 {   //是否设置Callback
        HandleCallback(event, action)  //Callback处理【查看详细分析】
    }

    if isHigh {
        consumeHighEvents(event, action) //高优化级事件处置【查看详细分析】
    } else {
        consumeLowEvents(event, action) //低优化级事件处置
    }
}

### Callback配置处理
func HandleCallback(event *model.Event, action *api.Action) {

    teams := action.Uic
    phones := []string{}
    mails := []string{}
    ims := []string{}

    if teams != "" {
        phones, mails, ims = api.ParseTeams(teams)  
        smsContent := GenerateSmsContent(event)
        mailContent := GenerateMailContent(event)
        imContent := GenerateIMContent(event)
        if action.BeforeCallbackSms == 1 {  //Call之前发送SMS
            redi.WriteSms(phones, smsContent)
            redi.WriteIM(ims, imContent)
        }

        if action.BeforeCallbackMail == 1 {//Call之前发送MAIL
            redi.WriteMail(mails, smsContent, mailContent)
        }
    }

    message := Callback(event, action)  //CallBack URL配置执行

    if teams != "" {
        if action.AfterCallbackSms == 1 { //Call之后发送SMS
            redi.WriteSms(phones, message)
            redi.WriteIM(ims, message)
        }

        if action.AfterCallbackMail == 1 { //Call之前发送MAIL
            redi.WriteMail(mails, message, message)
        }
    }

}

#### 事件触发回调HTTPAPI URL处理
func Callback(event *model.Event, action *api.Action) string {
    if action.Url == "" {
        return "callback url is blank"
    }

    L := make([]string, 0)
    if len(event.PushedTags) > 0 {
        for k, v := range event.PushedTags {
            L = append(L, fmt.Sprintf("%s:%s", k, v))
        }
    }

    tags := ""
    if len(L) > 0 {
        tags = strings.Join(L, ",")
    }

    //HTTP请求与请求参数构造
    req := httplib.Get(action.Url).SetTimeout(3*time.Second, 20*time.Second)  

    req.Param("endpoint", event.Endpoint)
    req.Param("metric", event.Metric())
    req.Param("status", event.Status)
    req.Param("step", fmt.Sprintf("%d", event.CurrentStep))
    req.Param("priority", fmt.Sprintf("%d", event.Priority()))
    req.Param("time", event.FormattedTime())
    req.Param("tpl_id", fmt.Sprintf("%d", event.TplId()))
    req.Param("exp_id", fmt.Sprintf("%d", event.ExpressionId()))
    req.Param("stra_id", fmt.Sprintf("%d", event.StrategyId()))
    req.Param("left_value", utils.ReadableFloat(event.LeftValue))
    req.Param("tags", tags)

    resp, e := req.String()  //HTTP请求执行,返回结果格式为字符串

    success := "success"
    if e != nil {
        log.Errorf("callback fail, action:%v, event:%s, error:%s", action, event.String(), e.Error())
        success = fmt.Sprintf("fail:%s", e.Error())
    }
    message := fmt.Sprintf("curl %s %s. resp: %s", action.Url, success, resp)
    log.Debugf("callback to url:%s, event:%s, resp:%s", action.Url, event.String(), resp)

    return message  //返回执行结果
} 


### 消费高优先级队列事件
### 高优先级的不做报警合并
### 低先级处理逻辑相同
func consumeHighEvents(event *cmodel.Event, action *api.Action) {
    if action.Uic == "" {
        return
    }

    phones, mails, ims := api.ParseTeams(action.Uic) //API组件查询解析告警组成员的通知联系信息 【查看详细分析】
    smsContent := GenerateSmsContent(event)  //生成事件SMS内容格式字符串
    mailContent := GenerateMailContent(event) //生成事件Mail内容格式字符串
    imContent := GenerateIMContent(event) //生成事件IM内容格式字符串

    // <=P2 才发送短信
    if event.Priority() < 3 {
        redi.WriteSms(phones, smsContent)   //入SMS队列 
    }

    redi.WriteIM(ims, imContent) //入IM队列 【查看详细分析】
    redi.WriteMail(mails, smsContent, mailContent) //入MAIL队列

}


#### 通过API查询解析维护人员组成员phones, emails, IM
func ParseTeams(teams string) ([]string, []string, []string) {
    if teams == "" {
        return []string{}, []string{}, []string{}
    }

    userMap := GetUsers(teams)  //API查询获取用户信息map
    phoneSet := set.NewStringSet()
    mailSet := set.NewStringSet()
    imSet := set.NewStringSet()
    for _, user := range userMap {
        if user.Phone != "" {
            phoneSet.Add(user.Phone)
        }
        if user.Email != "" {
            mailSet.Add(user.Email)
        }
        if user.IM != "" {
            imSet.Add(user.IM)
        }
    }
    return phoneSet.ToSlice(), mailSet.ToSlice(), imSet.ToSlice()
}
###### 通过成员组信息查询用户
func GetUsers(teams string) map[string]*uic.User {
    userMap := make(map[string]*uic.User)
    arr := strings.Split(teams, ",")
    for _, team := range arr {
        if team == "" {
            continue
        }

        users := UsersOf(team)  //API调用,成员组查询成员
        if users == nil {
            continue
        }

        for _, user := range users {
            userMap[user.Name] = user
        }
    }
    return userMap
}
func UsersOf(team string) []*uic.User {
    users := CurlUic(team) //CURL查询API

    if users != nil {
        Users.Set(team, users)
    } else {
        users = Users.Get(team)
    }

    return users
}
###### API组件接口,CURL HTTP访问与响应数据处理
func CurlUic(team string) []*uic.User {
    if team == "" {
        return []*uic.User{}
    }

    uri := fmt.Sprintf("%s/api/v1/team/name/%s", g.Config().Api.PlusApi, team)     //API接口
    req := httplib.Get(uri).SetTimeout(2*time.Second, 10*time.Second)  //GET请求
    token, _ := json.Marshal(map[string]string{
        "name": "falcon-alarm",
        "sig":  g.Config().Api.PlusApiToken,
    })
    req.Header("Apitoken", string(token))

    var team_users APIGetTeamOutput
    err := req.ToJson(&team_users)     //执行与返回JSON
    if err != nil {
        log.Errorf("curl %s fail: %v", uri, err)
        return nil
    }

    return team_users.Users  //返回用户列表
}


####依据事件信息生成SMS内容
#### IM/MAIL处理逻辑相同
func GenerateSmsContent(event *model.Event) string {
    return BuildCommonSMSContent(event)
}
func BuildCommonSMSContent(event *model.Event) string {
    return fmt.Sprintf(
        "[P%d][%s][%s][][%s %s %s %s %s%s%s][O%d %s]",
        event.Priority(),
        event.Status,
        event.Endpoint,
        event.Note(),
        event.Func(),
        event.Metric(),
        utils.SortedTags(event.PushedTags),
        utils.ReadableFloat(event.LeftValue),
        event.Operator(),
        utils.ReadableFloat(event.RightValue()),
        event.CurrentStep,
        event.FormattedTime(),
    )
}
##### WriteIM WriteSms
func WriteIM(tos []string, content string) {
    if len(tos) == 0 {
        return
    }
    im := &model.IM{Tos: strings.Join(tos, ","), Content: content}
    WriteIMModel(im)   //写入IM队列
}
######
func WriteIMModel(im *model.IM) {
    if im == nil {
        return
    }

    bs, err := json.Marshal(im)  //json格式化
    if err != nil {
        log.Error(err)
        return
    }

    log.Debugf("write im to queue, im:%v, queue:%s", im, IM_QUEUE_NAME)
    lpush(IM_QUEUE_NAME, string(bs))  //PUSH入IM redis队列
    
    //lpush(MAIL_QUEUE_NAME, string(bs))   PUSH入MAIL redis队列
    //lpush(SMS_QUEUE_NAME, string(bs))    PUSH入SMS redis队列
} 

cron.ReadLowEvent() 同ReadhighEvent处理低优先级队列(Redis keys)

func ReadLowEvent() {
    queues := g.Config().Redis.LowQueues //默认event:p3、p4、p5、p6
    if len(queues) == 0 {
        return
    }

    for {
        event, err := popEvent(queues)  //同highEvent
        if err != nil {
            time.Sleep(time.Second)
            continue
        }
        consume(event, false)   //同highEvent
    }
}

cron.CombineSms() 合并sms内容

func CombineSms() {
    for {
        // 每分钟读取处理一次
        time.Sleep(time.Minute)
        combineSms()
    }
}

## 合并SMS信息
func combineSms() {
    dtos := popAllSmsDto()   //SMS队列"/queue/user/sms" RPOP所有事件短信内容信息
    count := len(dtos)
    if count == 0 {
        return
    }

    //整理事件信息至内存map
    //key:Priority Status Phone Metric
    //value: pop原事件信息
    dtoMap := make(map[string][]*SmsDto) 
    for i := 0; i < count; i++ {
        key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].Phone, dtos[i].Metric)
        if _, ok := dtoMap[key]; ok {
            dtoMap[key] = append(dtoMap[key], dtos[i])
        } else {
            dtoMap[key] = []*SmsDto{dtos[i]}
        }
    }

    //如果同一个KEY,有多条SMS内容则合并成一条提供link链接
    for _, arr := range dtoMap {
        size := len(arr)  
        if size == 1 {  
            redi.WriteSms([]string{arr[0].Phone}, arr[0].Content)
            continue
        }

        // 如果有多条,把多个sms内容写入数据库,只给用户提供一个链接
        contentArr := make([]string, size)
        for i := 0; i < size; i++ {
            contentArr[i] = arr[i].Content
        }
        content := strings.Join(contentArr, ",,")

        first := arr[0].Content
        t := strings.Split(first, "][")
        eg := ""
        if len(t) >= 3 {
            eg = t[2]
        }

        path, err := api.LinkToSMS(content) //API组件调用links
        sms := ""
        if err != nil || path == "" {
            sms = fmt.Sprintf("[P%d][%s] %d %s.  e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
            log.Error("get short link fail", err)
        } else {
            sms = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
                arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)  //多条合并带连接地址
            log.Debugf("combined sms is:%s", sms)
        }

        redi.WriteSms([]string{arr[0].Phone}, sms)  //SMS重入队"/sms"  }
}

### 所有SMS内容出队
func popAllSmsDto() []*SmsDto {
    ret := []*SmsDto{}
    queue := g.Config().Redis.UserSmsQueue  //队列"/queue/user/sms"

    rc := g.RedisConnPool.Get() //redis client连接
    defer rc.Close()

    for {
        reply, err := redis.String(rc.Do("RPOP", queue)) //RPOP
        if err != nil {
            if err != redis.ErrNil {
                log.Error("get SmsDto fail", err)
            }
            break
        }
        if reply == "" || reply == "nil" {
            continue
        }

        var smsDto SmsDto
        err = json.Unmarshal([]byte(reply), &smsDto) //JSON反序列化为Sms结构体
        if err != nil {
            log.Errorf("json unmarshal SmsDto: %s fail: %v", reply, err)
            continue
        }
        ret = append(ret, &smsDto) //slice保存
    }
    return ret   //返回slice
}


cron.CombineMail() 合并MAIL内容(同上SMS)

func CombineMail() {
    for {
        // 每分钟读取处理一次
        time.Sleep(time.Minute)
        combineMail()
    }
}

func combineMail() {
    dtos := popAllMailDto() //Mail队列"/queue/user/mail" RPOP所有事件邮件内容信息
    count := len(dtos)
    if count == 0 {
        return
    }

    dtoMap := make(map[string][]*MailDto)
    for i := 0; i < count; i++ {
        key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].Email, dtos[i].Metric)
        if _, ok := dtoMap[key]; ok {
            dtoMap[key] = append(dtoMap[key], dtos[i])
        } else {
            dtoMap[key] = []*MailDto{dtos[i]}
        }
    }

    // 不要在这处理,继续写回redis,否则重启alarm很容易丢数据
    for _, arr := range dtoMap {
        size := len(arr)
        if size == 1 {
            redi.WriteMail([]string{arr[0].Email}, arr[0].Subject, arr[0].Content)
            continue
        }

        subject := fmt.Sprintf("[P%d][%s] %d %s", arr[0].Priority, arr[0].Status, size, arr[0].Metric)
        contentArr := make([]string, size)
        for i := 0; i < size; i++ {
            contentArr[i] = arr[i].Content
        }
        content := strings.Join(contentArr, "\r\n")

        log.Debugf("combined mail subject:%s, content:%s", subject, content)
        redi.WriteMail([]string{arr[0].Email}, subject, content)//mail重入队"/mail"
    }
}

cron.CombineIM() 合并IM内容(同上SMS)

func CombineIM() {
    for {
        // 每分钟读取处理一次
        time.Sleep(time.Minute)
        combineIM()
    }
}

func combineIM() {
    dtos := popAllImDto() //IM队列"/queue/user/im" RPOP所有事件IM内容信息
    count := len(dtos)
    if count == 0 {
        return
    }

    dtoMap := make(map[string][]*ImDto)
    for i := 0; i < count; i++ {
        key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].IM, dtos[i].Metric)
        if _, ok := dtoMap[key]; ok {
            dtoMap[key] = append(dtoMap[key], dtos[i])
        } else {
            dtoMap[key] = []*ImDto{dtos[i]}
        }
    }

    for _, arr := range dtoMap {
        size := len(arr)
        if size == 1 {
            redi.WriteIM([]string{arr[0].IM}, arr[0].Content)
            continue
        }

        // 把多个im内容写入数据库,只给用户提供一个链接
        contentArr := make([]string, size)
        for i := 0; i < size; i++ {
            contentArr[i] = arr[i].Content
        }
        content := strings.Join(contentArr, ",,")

        first := arr[0].Content
        t := strings.Split(first, "][")
        eg := ""
        if len(t) >= 3 {
            eg = t[2]
        }

        path, err := api.LinkToSMS(content)
        chat := ""
        if err != nil || path == "" {
            chat = fmt.Sprintf("[P%d][%s] %d %s.  e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
            log.Error("create short link fail", err)
        } else {
            chat = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
                arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)
            log.Debugf("combined im is:%s", chat)
        }
        
        redi.WriteIM([]string{arr[0].IM}, chat) //im重入队"/im"
    }
}

cron.ConsumeSms() 发送SMS事件通知

# 发送SMS事件通知入口函数
func ConsumeSms() {
    for {
        L := redi.PopAllSms()  
        if len(L) == 0 {
            time.Sleep(time.Millisecond * 200)
            continue
        }
        SendSmsList(L)
    }
}
## Pop所有'/sms'队列信息
func PopAllSms() []*model.Sms {
    ret := []*model.Sms{}
    queue := SMS_QUEUE_NAME  //"/sms"队列
    rc := g.RedisConnPool.Get()  //redis client连接池
    defer rc.Close()

    for {
        reply, err := redis.String(rc.Do("RPOP", queue)) //pop 
        if err != nil {
            if err != redis.ErrNil {
                log.Error(err)
            }
            break
        }

        if reply == "" || reply == "nil" {
            continue
        }

        var sms model.Sms
        err = json.Unmarshal([]byte(reply), &sms)  //json反序列为结构
        if err != nil {
            log.Error(err, reply)
            continue
        }
        ret = append(ret, &sms)   //slice sms
    }
    return ret   //返回slice
}

## 处理SMS发送
func SendSmsList(L []*model.Sms) {
    for _, sms := range L {
        SmsWorkerChan <- 1  //sms工作channel
        go SendSms(sms)     //并发线程处理发送SMS
    }
}


###发送SMS实现函数
func SendSms(sms *model.Sms) {
    defer func() {
        <-SmsWorkerChan
    }()

    url := g.Config().Api.Sms   
    r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)    //HTTP POST请求
    r.Param("tos", sms.Tos)   //HTTP 请求参数tos
    r.Param("content", sms.Content)  //HTTP 请求参数content 
    resp, err := r.String()  
    if err != nil {
        log.Errorf("send sms fail, tos:%s, cotent:%s, error:%v", sms.Tos, sms.Content, err)
    }

    log.Debugf("send sms:%v, resp:%v, url:%s", sms, resp, url)
}

cron.ConsumeIM() 发送IM事件通知(同上SMS)

func ConsumeIM() {
    for {
        L := redi.PopAllIM()  //pop all
        if len(L) == 0 {
            time.Sleep(time.Millisecond * 200)
            continue
        }
        SendIMList(L)       //send
    }
}

func SendIMList(L []*model.IM) {
    for _, im := range L {
        IMWorkerChan <- 1
        go SendIM(im)
    }
}

func SendIM(im *model.IM) {
    defer func() {
        <-IMWorkerChan
    }()

    url := g.Config().Api.IM
    r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)
    r.Param("tos", im.Tos)
    r.Param("content", im.Content)
    resp, err := r.String()
    if err != nil {
        log.Errorf("send im fail, tos:%s, cotent:%s, error:%v", im.Tos, im.Content, err)
    }

    log.Debugf("send im:%v, resp:%v, url:%s", im, resp, url)
}

cron.ConsumeMail() 发送MAIL事件通知(同上SMS)

func ConsumeMail() {
    for {
        L := redi.PopAllMail()  //pop all
        if len(L) == 0 {
            time.Sleep(time.Millisecond * 200)
            continue
        }
        SendMailList(L)  //send
    }
}

func SendMailList(L []*model.Mail) {
    for _, mail := range L {
        MailWorkerChan <- 1
        go SendMail(mail)
    }
}

func SendMail(mail *model.Mail) {
    defer func() {
        <-MailWorkerChan
    }()

    url := g.Config().Api.Mail
    r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)
    r.Param("tos", mail.Tos)
    r.Param("subject", mail.Subject)
    r.Param("content", mail.Content)
    resp, err := r.String()
    if err != nil {
        log.Errorf("send mail fail, receiver:%s, subject:%s, cotent:%s, error:%v", mail.Tos, mail.Subject, mail.Content, err)
    }

    log.Debugf("send mail:%v, resp:%v, url:%s", mail, resp, url)
}

cron.CleanExpiredEvent() //清理过旧的事件信息

func CleanExpiredEvent() {
    for {

        retention_days := g.Config().Housekeeper.EventRetentionDays  //内存保留event,默认7day
        delete_batch := g.Config().Housekeeper.EventDeleteBatch //批量清理events条数,默认100

        now := time.Now()
        before := now.Add(time.Duration(-retention_days*24) * time.Hour)
        eventmodel.DeleteEventOlder(before, delete_batch) //清理处理
        time.Sleep(time.Second * 60)
    }
}
//清理数据库事件信息
func DeleteEventOlder(before time.Time, limit int) {
    t := before.Format(timeLayout)
    sqlTpl := `delete from events where timestamp<? limit ?`
    q := orm.NewOrm()
    resp, err := q.Raw(sqlTpl, t, limit).Exec()
    if err != nil {
        log.Errorf("delete event older than %v fail, error:%v", t, err)
    } else {
        affected, _ := resp.RowsAffected()
        log.Debugf("delete event older than %v, rows affected:%v", t, affected)
    }
}

技术经验借鉴

扩展学习

上一篇 下一篇

猜你喜欢

热点阅读