19-定时任务库“robfig/cron”核心源码解读

2021-06-07  本文已影响0人  欢乐毅城

定时任务是项目中一个比较常见的情景。那么,今天我们就来介绍一个github上还不错的定时任务库“github.com/robfig/cron.git/v3”。

拉取库文件:

  go get github.com/robfig/cron/v3@v3.0.0

demo示例:

package main

import (
    "fmt"
    "time"
    "github.com/robfig/cron/v3"
)

//定时任务
func timedTask() {
    fmt.Printf("执行定时任务: %s \n", time.Now().Format("2006-01-02 15:04:05"))
}

func main() {
    //初始化一个cron对象
    c := cron.New()

    //方法一:通过AddFunc注册,任务调度
    spec := "50,51 17 * * *"  //每天17点50分,51分执行
    //spec := "@every 3s"   //每3秒执行一次(秒数可以超过60s)
    //spec := "CRON_TZ=Asia/Tokyo 30 04 * * *"
    
    //参数一:调度的时间策略,参数二:到时间后执行的方法。
    enterId, err := c.AddFunc(spec, timedTask)
    if err != nil {
        panic(err)
    }
    fmt.Printf("任务id是 %d \n", enterId)

    //启动
    c.Start()

    //用于阻塞 后面可以使用 select {} 阻塞
    time.Sleep(time.Hour * 9)

    //关闭定时任务(其实不关闭也可以,主进程直接结束了, 内部的goroutine协程也会自动结束)
    c.Stop()
}
 //方法二:通过AddJob注册
   type myType int
   func (c myType) Run() {
      fmt.Println("myType 实现了 Run方法")
      return
   }
   
   var dataNew myType = 10
   c.AddJob("@every 5s", dataNew)
   //调用方法AddJob(spec string, cmd Job)也可以实现AddFunc注册的功能,
   //Job是interface,需要入参类型实现方法:Run()。实际上,
   //方法AddFunc内部将参数cmd 进行了包装(wrapper),然后也是调用方法AddJob进行注册。

源码分析:

  //Cron数据结构
  type Cron struct {
    entries   []*Entry  //调度执行实体列表(或job的指针对象)
    chain     Chain     //chain用来定义Entry里的warppedJob使用的逻辑
    stop      chan struct{}  //停止所有调度任务
    add       chan *Entry    //添加一个调度任务
    remove    chan EntryID   //移除一个调度任务
    snapshot  chan chan []Entry  //运行中的调度任务
    running   bool  //代表是否已经在执行,用于操作整个cron对象只启动一次
    logger    Logger  //记录日志信息
    runningMu sync.Mutex //协程锁,确保运行数据安全,比如增加或移除entry
    location  *time.Location   // 时区
    parser    ScheduleParser  //对时间格式进行解析
    nextID    EntryID   //下一个任务的id
    jobWaiter sync.WaitGroup //run task时进行add(1),结束时done(),以此保证所有job都能退出
  }
  
  // Entry 数据结构,每一个被调度实体一个
  type Entry struct {
    // 唯一id,用于查询和删除,默认是自增的
       ID EntryID 
    //本Entry的调度时间,不是绝对时间,在生成entry时会计算出来
       Schedule Schedule
    // 本entry下次需要执行的绝对时间,会一直被更新
    // 被封装的含义是Job可以多层嵌套,可以实现基于需要执行Job的额外处理
    // 比如抓取Job异常、如果Job没有返回下一个时间点的Job是还是继续执行还是delay
       Next time.Time 
    //上一次被执行时间,主要用来查询
       Prev time.Time 
    //WrappedJob 是真实执行的Job实体(执行的任务)
       WrappedJob Job
    //Job主要给用户查询
       Job Job
  }
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
    c.logger.Info("start")
  
    // Figure out the next activation times for each entry.
    //获取当前时间
    now := c.now()
    //循环调入任务,计算下一次任务的执行时间
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now)
        c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    }
      
      //第一层死循环,无限循环
    for {
        // Determine the next entry to run.
        // 按时间先后排队调度任务
        sort.Sort(byTime(c.entries))
  
        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            // 如果cron启动后 还没有 调度信息的话 就生成一个sleep10W小时的 chan Time,
            //用于阻塞下面的 select{} ,因为`select`是多路复用,
            //其他channel能返回数据时,select就回执行不会阻塞。
            // 所以当没有任务时,启动Start()程序 就会被这个阻塞
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            //如果有调度信息,就 sleep 调度任务中第一个的 循环时间 
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }
  
          // 第二层死循环,内部使用select{}阻塞
        for {
            select {
            case now = <-timer.C:  //上一步中的 timer sleep时间如果到了就执行
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)
  
                // Run every entry whose next time was less than now
                for _, e := range c.entries {
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }
  
            case newEntry := <-c.add: //向Cron中添加了 一个调度任务就会执行
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
  
            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue
                   
            case <-c.stop:   // 停止定时任务
                timer.Stop()
                c.logger.Info("stop")
                return
  
            case id := <-c.remove: // 移除任务
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }
               //当以上任意一个channel满足时,就会结束内层循环 重复上一层步骤
            break
        }
    }
}
  
CRON Expression Format (CRON表达式格式):
Field name Mandatory? Allowed values Allowed special characters
Minutes Yes 0-59 * / , -
Hours Yes 0-23 * / , -
Day of month Yes 1-31 * / , - ?
Month Yes 1-12 or JAN-DEC * / , -
Day of week Yes 0-6 or SUN-SAT * / , - ?
Predefined schedules( 预定义的时间表):
Entry(输入) Description(描述) Equivalent To(等于)
@yearly (or @annually) Run once a year, midnight, Jan. 1st 0 0 1 1 *
@monthly Run once a month, midnight, first of month 0 0 1 * *
@weekly Run once a week, midnight between Sat/Sun 0 0 * * 0
@daily (or @midnight) Run once a day, midnight 0 0 * * *
@hourly Run once an hour, beginning of hour 0 * * * *

欠缺的地方:

上一篇下一篇

猜你喜欢

热点阅读