golang实现定时任务调度

2019-01-14  本文已影响0人  yushu_bd

在定时任务中,有这样一个场景,定时给某些用户发送消息,或者定时给某些数据进行对账,而这些场景有一个要求是,下次处理是上次处理的定时间隔(比如下次的1分钟)等,这样就会出现,每秒处理的数据和用户是不一样的。比如13:13:13 对数据A处理失败了(处理失败常见,非常规意义的失败),那下一分钟接着处理A(13:14:13),13:13:14 对数据B处理失败了那下一分钟接着处理B(13:14:14)。这样就会发现,每分钟的60秒都在处理一批批数据。类似于时间轮一样的轮循。

image.png

当然,也可以使用消息队列的延时队列,但这种情况会造成,一个消息的堆积,而且无法处理这种清空,比如A,B的任务相差一分钟,那在A的2分钟后是发送A的第2次,而B是第1次,这两次是同时发送的,消息队列比较难处理这种情况,同时对于消息的标记也比较麻烦。

方案:可以使用go里的定时器,每秒触发一次任务,对任务进行轮循处理,简单的demo如下:(这里忽略了任务的进入,具体可以使用kafka结合到达的时间点来标记队列的轮片)

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

type Task struct {
    msg string   //任务的内容, 具体可以是一个复杂的结构体对象
    pri int   // 任务的优先级,在对同一个bucket的数据,可以按照优先级来处理
    idx int   //  bucket 的标识
    status bool // 任务标识,标识任务是否执行成功,是否需要删除
}

func (t *Task) runTask() {  //简单的执行任务
    fmt.Println("run message", t.msg)
    t.status = true
}

var taskList = map[int][]Task{}

func sendTask(idx int) {
    msg := fmt.Sprintf("task message %d", idx)
    pri := idx / 60
    idx = idx % 60

    task := Task{
        msg,
        pri,
        idx,
            false,
    }
    taskList[idx] = append(taskList[idx], task)
}

/**
 * 假设 i是任务的id号,表示有一个150个任务要进如队列审核
 */
func initTask() {
    for i := 0; i < 150; i++ {
        sendTask(i)
    }
}

var ticker = time.NewTicker(1 * time.Second)
var cc = 0 //轮片指针

func main() {
    c := make(chan os.Signal)
    status:=true
    signal.Notify(c,
        syscall.SIGKILL,
        syscall.SIGHUP,
        syscall.SIGINT,
        syscall.SIGQUIT,
        os.Interrupt,
        os.Kill,
    )
    initTask()
    go func() {
        for {
            select {
            case <-ticker.C:
                for _, t := range taskList[cc] {
                    if t.status == false {
                        t.runTask()
                    }
                }
                cc += 1
                cc = cc%60  //循环轮询
            case <-c: //监听 信号
                ticker.Stop()
                fmt.Println("kill task")
                status = false
                break

            }
        }
    }()
    for {// 常驻
        time.Sleep(1*time.Second)
        if status == false {
            break
        }
    }
}

如有不对的地方,请指正

上一篇 下一篇

猜你喜欢

热点阅读