Go Cron 相关项目源码阅读

介绍

这段时间一直想把之前写的 monitor 用 go 重写一遍,由于自己是 go 的初学者, 自然要参考一些 go 的项目的 source code 来深入学习 go。

Monitor 项目中 cron 占了很大的比重, 所以本次代码阅读主要集中在 github 上两个 Star 数比较多的项目上。

rk/go-cron

这个项目比较小巧,整个项目不到100行代码,简单浏览了下很简单。

首先项目定义了job的数据结构

1
2
3
4
5
type job struct {
  Month, Day, Weekday  int8
  Hour, Minute, Second int8
  Task                 func(time.Time)
}

然后通过这个函数来调度

1
2
3
4
5
6
7
8
9
10
11
12
func processJobs() {
  for {
    now := time.Now()
    for _, j := range jobs {
      // execute all our cron tasks asynchronously
      if j.Matches(now) { //Matches 用来判断当前job是否运行
        go j.Task(now) 
      }
    }
    time.Sleep(time.Second)
  }
}

项目灰常简单,可惜作者自己定义了一套 cron 的语法。

robfig/cron

这个项目就显得专业很多,从 test 到注释,再到文档都很全。

我们还是从项目的数据结构入手,以下是cron项目定义数据结构的关系

1
2
3
4
5
6
7
8
9
10
11
12
- cron
  - entries  []*Entry
    - Schedule (interface)
      - Next(time.Time) time.Time  # 返回给定时间之后的最近job启动时间  
    - Next (time.Time) #下次运行时间
    - Prev (time.Time) #上次运行时间
    - Job (interface)
      - Run() 
  - stop     chan struct{}
  - add      chan *Entry
  - snapshot chan []*Entry 
  - running  bool

然后我们从项目的 sample code 入手。

Demo code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
c := cron.New()
c.AddFunc("0 5 * * * *",  func() { fmt.Println("Every 5 minutes") })
c.AddFunc("@hourly",      func() { fmt.Println("Every hour") })
c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") })
c.Start()
..
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
c.AddFunc("@daily", func() { fmt.Println("Every day") })
..
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())
..
c.Stop()  // Stop the scheduler (does not stop any jobs already running).

首先是 new 函数 很简单就是初始化一下, 返回cron的指针

1
2
3
4
5
6
7
8
9
10
// New returns a new Cron job runner.
func New() *Cron {
  return &Cron{
    entries:  nil,
    add:      make(chan *Entry),
    stop:     make(chan struct{}),
    snapshot: make(chan []*Entry),
    running:  false,
  }
}

接下来是 AddFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func()) error {
  return c.AddJob(spec, FuncJob(cmd))
}

// AddFunc adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
  schedule, err := Parse(spec) //格式cron的字符,这个项目还做了下扩展,支持@hourly的形式
  if err != nil {
    return err
  }
  c.Schedule(schedule, cmd) //把schedule 加入到 cron 的 entries 中
  return nil
}

最后是 Start

这个项目关键点就是用 channel 来驱动各项事件,比如结束,开始,运行等等。

这样使得代码的结构清晰,方便阅读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Start the cron scheduler in its own go-routine.
func (c *Cron) Start() {
  c.running = true
  go c.run()
}

// Run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
  // Figure out the next activation times for each entry.
  now := time.Now().Local()
  
  //遍历 entries 获取每个entry的下次运行时间
  for _, entry := range c.entries { 
    entry.Next = entry.Schedule.Next(now)
  }

  for {
    // Determine the next entry to run.
    //把entries按照时间排序
    sort.Sort(byTime(c.entries))

    var effective time.Time
    if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
      // If there are no entries yet, just sleep - it still handles new entries
      // and stop requests.
      effective = now.AddDate(10, 0, 0)
    } else {
      effective = c.entries[0].Next
    }
       

    select {
    case now = <-time.After(effective.Sub(now)):
      // Run every entry whose next time was this effective time.
      for _, e := range c.entries {
        if e.Next != effective {
          break
        }
        go e.Job.Run()
        e.Prev = e.Next
        e.Next = e.Schedule.Next(effective)
      }
      continue
    
    //添加 新的 job 时
    case newEntry := <-c.add:
      c.entries = append(c.entries, newEntry)
      newEntry.Next = newEntry.Schedule.Next(now)
        
    //导出当前运行的snapshot
    case <-c.snapshot:
      c.snapshot <- c.entrySnapshot()
        
    //停止
    case <-c.stop:
      return
    }

    // 'now' should be updated after newEntry and snapshot cases.
    now = time.Now().Local()
  }
}

其他的代码逻辑都是在 parse cron 的样式。

结束

最后来个 robfig/cron 的 demo 结束 本篇blog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
  "fmt"
  "github.com/robfig/cron"
  "time"
)

func main() {
  fmt.Println("Go cron test")
  c := cron.New()
  c.AddFunc("*/5 * * * * *", func() { fmt.Println("Every 5 Seconds") })
  c.Start()
  time.Sleep(900 * time.Second)

}