golang cron包源码解析

第三方golang代码库源码阅读

Posted by KL on July 27, 2018

GO Cron库源代码阅读

项目中用到了GO的一个第三方库,内置实现了cron调度的功能,学习一下其设计与实现 项目地址: https://github.com/robfig/cron.git 首先看下官方文档的 https://godoc.org/github.com/robfig/cron 的简单用例

c := cron.New()
c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("@hourly",      func() { fmt.Println("Every hour") })
c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") })
c.Start()
..
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
c.AddFunc("@daily", func() { fmt.Println("Every day") })
..
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())
..
c.Stop()  // Stop the scheduler (does not stop any jobs already running).

我们首先从cron.New()函数开始看

cron.go 文件

New()方法

从下面能看出函数New()初始化了一个空的Cron结构体

// New returns a new Cron job runner, in the Local time zone.
func New() *Cron {
	return NewWithLocation(time.Now().Location()) // 默认应该UTC timezone
}

// NewWithLocation returns a new Cron job runner.
func NewWithLocation(location *time.Location) *Cron {
	return &Cron{
		entries:  nil,  //entries  []*Entry Entry是cron的核心组件
		add:      make(chan *Entry),
		stop:     make(chan struct{}),
		snapshot: make(chan []*Entry),
		running:  false,
		ErrorLog: nil,
		location: location,
	}
}

// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
	// The schedule on which this job should be run.
	Schedule Schedule // 实现了Next方法,可以返回下一次启动cron的时间

	// The next time the job will run. This is the zero time if Cron has not been
	// started or this entry's schedule is unsatisfiable
	Next time.Time

	// The last time this job was run. This is the zero time if the job has never
	// been run.
	Prev time.Time

	// The Job to run.
	Job Job //一个接口实现了Run(),用来存储真正执行的任务是什么
}

AddFunc方法

主要代码如下

// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func()) error {
	return c.AddJob(spec, FuncJob(cmd))
}

// AddJob adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
	schedule, err := Parse(spec)
	if err != nil {
		return err
	}
	c.Schedule(schedule, cmd)
	return nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
func (c *Cron) Schedule(schedule Schedule, cmd Job) {
	entry := &Entry{
		Schedule: schedule,
		Job:      cmd,
	}
	if !c.running {
		c.entries = append(c.entries, entry)
		return
	}

	c.add <- entry
}

AddFunc第一参数spec是一个string,用来接收cron 时间的定义,从例子上可以看到"0 30 * * * *" 类似于这样的,或者@daily这样的用法,具体cron package可以支持哪些用法,一会可以看下另外一个文件parser.go。第二个参数cmd是一个函数类型,是说明时间到了之后真正要做的事情。 从上面一大段代码中能看到的AddFunc的做的事情主要是

解析cron时间点的定义,把解析之后的结果,放到cron的add chan里面

Start()方法

cron提供了两种启动方法,一种是带Recovery保护的(runWithRecovery),一种不带。两者都是用go routine新启动了一个进程做cron的调度。 runWithRecovery是在调用cmd函数的时候使用的,可以保证用户自定义函数的panic不会导致cron本身程序的崩溃,是一种保护机制。 然后看下run()真正做了什么。

// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() {
	if c.running {
		return
	}
	c.running = true
	go c.run()
}

// Run the scheduler. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
	// Figure out the next activation times for each entry.
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now) // 这里用到了schedule包的Next方法,得到了下次job的启动时间
	}

	for {
		// Determine the next entry to run.
		sort.Sort(byTime(c.entries)) // cron 包定义了自己的排序方法,按照下一次job的启动时间早晚就行排序

		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // 没有任务或者第一个任务没有schedule time,睡眠
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}

		for {
			select {
			case now = <-timer.C: //其实还是使用了time.Timer来触发任务
				now = now.In(c.location)
				// Run every entry whose next time was less than now
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					go c.runWithRecovery(e.Job) // 启动任务函数
					e.Prev = e.Next // 记录下了每次任务调度时间的链路,双链表
					e.Next = e.Schedule.Next(now)
				}

			case newEntry := <-c.add: //AddFunc会在c.add channel添加任务
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now) //Schedule.Next才是cron包的主要工作函数,定义了每次任务的触发时间
				c.entries = append(c.entries, newEntry)

			case <-c.snapshot: //可以调用c.Entries()返回一个现有任务列表的snapshot
				c.snapshot <- c.entrySnapshot() 
				continue

			case <-c.stop:
				timer.Stop()
				return
			}

			break
		}
	}
}

paser.go文件

在AddFunc的时候,有一段如下cron string参数的解析过程,可以看下Parse函数的工作原理 schedule, err := Parse(spec) 主要函数如下

var defaultParser = NewParser(
	Second | Minute | Hour | Dom | Month | DowOptional | Descriptor,
)

func (p Parser) Parse(spec string) (Schedule, error) {
	if spec[0] == '@' && p.options&Descriptor > 0 { //@every 语言描述用法的支持
		return parseDescriptor(spec)
	}
	// Fill in missing fields 
	fields = expandFields(fields, p.options) // 因为支持*,0 之类的cron传统写法,所以需要把* 之类的值填成0

	var err error
	field := func(field string, r bounds) uint64 { // 这种定义函数的用法,之前比较少见,学习了
		...
	}

	var (
		second     = field(fields[0], seconds)
		minute     = field(fields[1], minutes)
		...
	)

	return &SpecSchedule{
		Second: second,
		Minute: minute,
		...
	}, nil
}

// parseDescriptor returns a predefined schedule for the expression, or error if none matches.
func parseDescriptor(descriptor string) (Schedule, error) {
	switch descriptor {
	case "@yearly", "@annually":
		return &SpecSchedule{
			Second: 1 << seconds.min,
			Minute: 1 << minutes.min,
			...
	const every = "@every "
	if strings.HasPrefix(descriptor, every) {
		duration, err := time.ParseDuration(descriptor[len(every):])
}

Parser最终返回一个spec.go里面的结构体, schedule 会根据这个结构体的定义判断下一个运行时间是什么时候

type SpecSchedule struct {
	Second, Minute, Hour, Dom, Month, Dow uint64
}

parseDescriptor基本功能就是一个转换器,把语言描述的cron设置信息,转换成真正的schedule,其中every的解析使用了time.ParseDuration()函数,此函数的comment有如下说明,最大支持为h,最小可到ns // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". cron调度本身不太需要小于1s的定时调度,所以cron package支持的最小时间间隔也是1s,可看下面代码

// Every returns a crontab Schedule that activates once every duration.
// Delays of less than a second are not supported (will round up to 1 second).
// Any fields less than a Second are truncated.
func Every(duration time.Duration) ConstantDelaySchedule {
	if duration < time.Second {
		duration = time.Second
	}
	return ConstantDelaySchedule{
		Delay: duration - time.Duration(duration.Nanoseconds())%time.Second,
	}
}

spec.go文件

最后一个文件,主要功能是产生下一次job运行的时间。代码并不长,主要就是下面的函数

   // Next returns the next time this schedule is activated, greater than the given
	// Start at the earliest possible time (the upcoming second).
	t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)

	// This flag indicates whether a field has been incremented.
	added := false

	// If no time is found within five years, return zero.
	yearLimit := t.Year() + 5

WRAP:
	if t.Year() > yearLimit {
		return time.Time{}
	}

	// Find the first applicable month.
	// If it's this month, then do nothing.
	for 1<<uint(t.Month())&s.Month == 0 { // 0 表示每个月都会运行
		// If we have to add a month, reset the other parts to 0.
		if !added {
			added = true
			// Otherwise, set the date at the beginning (since the current time is irrelevant).
			t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, t.Location())
		}
		t = t.AddDate(0, 1, 0)

		// Wrapped around.
		if t.Month() == time.January {
			goto WRAP
		}
	}

	// Now get a day in that month.
	for !dayMatches(s, t) {
		if !added {
			added = true
			t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
		}
		t = t.AddDate(0, 0, 1)

		if t.Day() == 1 {
			goto WRAP
		}
	}
	...

	return t
}

这里函数用到我之前项目组从没用过的goto用法,在golang的内存分配,go routinue管理的代码中可看到不少类似的用法,业务代码中不太常见。 从comment中能看到如下描述

While incrementing the field, a wrap-around brings it back to the beginning of the field list (since it is necessary to re-verify previous field values)

上面代码中有一个地方需要注意,cron有个validation去判断,如果你定义的cron任务5年之后才会第一次运行,程序会认为是非法的。正常情况下也不会真有人这么用😄

总结

cron packge代码结构拆分比较合理,代码阅读起来也不费事,调用流程也很容易可以走通。闲暇时间,review学习下他人写的代码也是对自己的一种提高。