19-定时任务库“robfig/cron”核心源码解读
2021-06-07 本文已影响0人
欢乐毅城
定时任务是项目中一个比较常见的情景。那么,今天我们就来介绍一个github上还不错的定时任务库“github.com/robfig/cron.git/v3”。
拉取库文件:
go get github.com/robfig/cron/v3@v3.0.0
demo示例:
package main
import (
"fmt"
"time"
"github.com/robfig/cron/v3"
)
//定时任务
func timedTask() {
fmt.Printf("执行定时任务: %s \n", time.Now().Format("2006-01-02 15:04:05"))
}
func main() {
//初始化一个cron对象
c := cron.New()
//方法一:通过AddFunc注册,任务调度
spec := "50,51 17 * * *" //每天17点50分,51分执行
//spec := "@every 3s" //每3秒执行一次(秒数可以超过60s)
//spec := "CRON_TZ=Asia/Tokyo 30 04 * * *"
//参数一:调度的时间策略,参数二:到时间后执行的方法。
enterId, err := c.AddFunc(spec, timedTask)
if err != nil {
panic(err)
}
fmt.Printf("任务id是 %d \n", enterId)
//启动
c.Start()
//用于阻塞 后面可以使用 select {} 阻塞
time.Sleep(time.Hour * 9)
//关闭定时任务(其实不关闭也可以,主进程直接结束了, 内部的goroutine协程也会自动结束)
c.Stop()
}
//方法二:通过AddJob注册
type myType int
func (c myType) Run() {
fmt.Println("myType 实现了 Run方法")
return
}
var dataNew myType = 10
c.AddJob("@every 5s", dataNew)
//调用方法AddJob(spec string, cmd Job)也可以实现AddFunc注册的功能,
//Job是interface,需要入参类型实现方法:Run()。实际上,
//方法AddFunc内部将参数cmd 进行了包装(wrapper),然后也是调用方法AddJob进行注册。
源码分析:
- 基本数据结构体:(cron.go)
//Cron数据结构
type Cron struct {
entries []*Entry //调度执行实体列表(或job的指针对象)
chain Chain //chain用来定义Entry里的warppedJob使用的逻辑
stop chan struct{} //停止所有调度任务
add chan *Entry //添加一个调度任务
remove chan EntryID //移除一个调度任务
snapshot chan chan []Entry //运行中的调度任务
running bool //代表是否已经在执行,用于操作整个cron对象只启动一次
logger Logger //记录日志信息
runningMu sync.Mutex //协程锁,确保运行数据安全,比如增加或移除entry
location *time.Location // 时区
parser ScheduleParser //对时间格式进行解析
nextID EntryID //下一个任务的id
jobWaiter sync.WaitGroup //run task时进行add(1),结束时done(),以此保证所有job都能退出
}
// Entry 数据结构,每一个被调度实体一个
type Entry struct {
// 唯一id,用于查询和删除,默认是自增的
ID EntryID
//本Entry的调度时间,不是绝对时间,在生成entry时会计算出来
Schedule Schedule
// 本entry下次需要执行的绝对时间,会一直被更新
// 被封装的含义是Job可以多层嵌套,可以实现基于需要执行Job的额外处理
// 比如抓取Job异常、如果Job没有返回下一个时间点的Job是还是继续执行还是delay
Next time.Time
//上一次被执行时间,主要用来查询
Prev time.Time
//WrappedJob 是真实执行的Job实体(执行的任务)
WrappedJob Job
//Job主要给用户查询
Job Job
}
- 核心函数run() :
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
c.logger.Info("start")
// Figure out the next activation times for each entry.
//获取当前时间
now := c.now()
//循环调入任务,计算下一次任务的执行时间
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
//第一层死循环,无限循环
for {
// Determine the next entry to run.
// 按时间先后排队调度任务
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
// 如果cron启动后 还没有 调度信息的话 就生成一个sleep10W小时的 chan Time,
//用于阻塞下面的 select{} ,因为`select`是多路复用,
//其他channel能返回数据时,select就回执行不会阻塞。
// 所以当没有任务时,启动Start()程序 就会被这个阻塞
timer = time.NewTimer(100000 * time.Hour)
} else {
//如果有调度信息,就 sleep 调度任务中第一个的 循环时间
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
// 第二层死循环,内部使用select{}阻塞
for {
select {
case now = <-timer.C: //上一步中的 timer sleep时间如果到了就执行
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add: //向Cron中添加了 一个调度任务就会执行
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop: // 停止定时任务
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove: // 移除任务
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
//当以上任意一个channel满足时,就会结束内层循环 重复上一层步骤
break
}
}
}
- 时间书写格式:
CRON Expression Format (CRON表达式格式):
Field name | Mandatory? | Allowed values | Allowed special characters |
---|---|---|---|
Minutes | Yes | 0-59 | * / , - |
Hours | Yes | 0-23 | * / , - |
Day of month | Yes | 1-31 | * / , - ? |
Month | Yes | 1-12 or JAN-DEC | * / , - |
Day of week | Yes | 0-6 or SUN-SAT | * / , - ? |
Predefined schedules( 预定义的时间表):
Entry(输入) | Description(描述) | Equivalent To(等于) |
---|---|---|
@yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 1 1 * |
@monthly | Run once a month, midnight, first of month | 0 0 1 * * |
@weekly | Run once a week, midnight between Sat/Sun | 0 0 * * 0 |
@daily (or @midnight) | Run once a day, midnight | 0 0 * * * |
@hourly | Run once an hour, beginning of hour | 0 * * * * |
欠缺的地方:
- 不支持持久化,重启一下服务,调度任务信息就没了,需要自己存储调度信息。
- 不支持一次定时,若想实现此功能,调用一次之后再调用移除可以实现。