目录

Go定时器源码分析

虽然golang的定时器经过几版的改进优化,但是仍然是性能的大杀手。

golang1.13和1.14的区别

golang在1.10版本之前是由一个独立的timerproc通过小顶堆和futexsleep来管理定时任务。1.10版本之后是把独立的timerproc和小顶堆分成最多64个timerproc协程和四叉堆,用来休眠的方式还是 futexsleep

而1.14版的timer是把存放定时事件的四叉堆放到了P结构中,同时取消了timerproc协程,转而使用netpollepoll wait来做就近时间的休眠等待。

函数签名

对于NewTimer函数,我们可以找到实现 time/sleep.go#L82。其实我们可以发现,NewTimerNewTickerAfter其实都是调用addTimer来新增定时任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type Timer struct {
	C <-chan Time
	r runtimeTimer
}

// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}

func sendTime(c interface{}, seq uintptr) {
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			when:   when(d),
			period: int64(d),
			f:      sendTime,
			arg:    c,
		},
	}
	startTimer(&t.r)
	return t
}

这里主要分成两步:

1、创建一个Timer对象,包含一个具有缓冲区channel的c,用来接收Timer消息的,包含的runtimeTimer结构体,when是代表timer触发的绝对时间(当前时间+d),ftimer触发时的回调函数,arg是传给f的参数。

2、调用startTimer,实际上是调用runtime包下的addtimer函数。

3、NewTicker调用的是相同的函数,只是多了一个字段period,表示计时器再次被唤醒的时间,做轮询触发。

golang1.13的定时器原理

首先会初始化一个长度为64的timers数组,通过协程的pid取模来分配timersBucket,如果发现新的定时任务比较新,那么调用notewakeup来激活唤醒timerprocfutex等待。如果发现没有实例化timerproc,则启动。

1、添加定时器

1
2
3
4
5
6
7
8
9
func addtimer(t *timer) {
	tb := t.assignBucket()
	lock(&tb.lock)
	ok := tb.addtimerLocked(t)
	unlock(&tb.lock)
	if !ok {
		badTimer()
	}
}

可以看到addtimer做了两件事:

1、assignBucket找到可以被插入的bucket

2、addtimerLockedtimer插入到bucket

2、timersBucket

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
const timersLen = 64

// timer包含每个P的堆,timer进入队列中关联当前的P,所以每个P中timer都是独立于其他P的
// 如果GOMAXPROCS > timersLen,那么timersBucket可能会管理多个P
var timers [timersLen]struct {
	timersBucket

	// 内存对齐
	pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte
}

type timersBucket struct {
	lock         mutex
	gp           *g
	created      bool
	sleeping     bool
	rescheduling bool
	sleepUntil   int64
	waitnote     note
	t            []*timer
}

runtime中,有64个全局定义的timer bucket。每个bucket负责管理timertimer的整个生命周期包括创建、销毁、唤醒、睡眠等都是由timer bucket管理和调度。

问:为什么是64个timer bucket?

答:在1.10版本之前,只有1个timers对象,在添加定时器任务时都需要对timers进行加锁和解锁操作,影响性能;当timer过多,timers中的t很多,添加进四叉堆操作可能耗时比较长,可能会导致timer的延迟。因此引入全局64个分桶的策略,将timer分散到桶中,每个桶只负责自己的timer,有效降低了锁的粒度和timer调度的负担。

而根据最优的情况下,应该是分桶的数量应该要和GOMAXPROCS数量一致,有多少个P就有多少个timer bucket。但是,这就涉及到P的动态分配问题,所以在性能的权衡下,使用64 能够覆盖大多数的场景。

3、分配桶

1
2
3
4
5
func (t *timer) assignBucket() *timersBucket {
	id := uint8(getg().m.p.ptr().id) % timersLen
	t.tb = &timers[id].timersBucket
	return t.tb
}

4、添加timer到四叉堆

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (tb *timersBucket) addtimerLocked(t *timer) bool {
	// 此时when应该是当前时间+duration
	if t.when < 0 {
		t.when = 1<<63 - 1
	}
	// 将timer添加到四叉堆中
	t.i = len(tb.t)
	tb.t = append(tb.t, t)
	if !siftupTimer(tb.t, t.i) {
		return false
	}
	// 首次添加
	if t.i == 0 {
		// 如果timerproc在sleep,唤醒它
		if tb.sleeping && tb.sleepUntil > t.when {
			tb.sleeping = false
			notewakeup(&tb.waitnote)
		}
		// 如果timerproc被挂起了,重新调度
		if tb.rescheduling {
			tb.rescheduling = false
			goready(tb.gp, 0)
		}
		// 如果timer的桶还没有创建,创建并开始timerproc
		if !tb.created {
			tb.created = true
			go timerproc(tb)
		}
	}
	return true
}

问:为什么是四叉堆?

答:上推节点的操作更快;对缓存更友好。

5、timerproc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// timerproc 外层循环不会退出
func timerproc(tb *timersBucket) {
	tb.gp = getg()
	for {
		lock(&tb.lock)
		// 修改睡眠标识
		tb.sleeping = false
		// 当前时间
		now := nanotime()
		delta := int64(-1)
		for {
			// 如果桶内没有timer,退出
			if len(tb.t) == 0 {
				delta = -1
				break
			}
			// 获取最早触发的timer
			t := tb.t[0]
			delta = t.when - now
			// 还没有到达触发时间,退出
			if delta > 0 {
				break
			}
			ok := true
			if t.period > 0 {
				// 需要周期性触发定时器,需要修改timer的触发时间,重新添加到最小堆中
				// leave in heap but adjust next time to fire
				t.when += t.period * (1 + -delta/t.period)
				if !siftdownTimer(tb.t, 0) {
					ok = false
				}
			} else {
				// 从最小堆中移除
				last := len(tb.t) - 1
				if last > 0 {
					tb.t[0] = tb.t[last]
					tb.t[0].i = 0
				}
				tb.t[last] = nil
				tb.t = tb.t[:last]
				if last > 0 {
					if !siftdownTimer(tb.t, 0) {
						ok = false
					}
				}
				t.i = -1 // 下标标记为-1,deltimer发现下标为-1时就不删除了
			}
			f := t.f
			arg := t.arg
			seq := t.seq
			unlock(&tb.lock)
			if !ok {
				badTimer()
			}
			if raceenabled {
				raceacquire(unsafe.Pointer(t))
			}
			f(arg, seq)
			lock(&tb.lock)
		}
		if delta < 0 || faketime > 0 {
			// 如果桶中没有timer,把协程挂起
			tb.rescheduling = true
			goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
			continue
		}
		// 如果还有timer,睡眠到桶内最早触发的时间点后唤醒
		tb.sleeping = true
		tb.sleepUntil = now + delta
		noteclear(&tb.waitnote)
		unlock(&tb.lock)
		notetsleepg(&tb.waitnote, delta)
	}
}

6、小结

1、首选预分配64个的timer buckettimer bucket里面是一个四叉堆存放timer

2、每次新增的timer,添加到四叉堆中,会尝试唤醒和调度bucket

3、第一次新增的bucket会运行协程timerproctimerproc是一个死循环,周期性地检查定时器状态。

4、每次从最小堆中取出timer,如果是计时器,则重新加入到bucket中。如果bucket没有timer,则将timerproc挂起。如果还有timer,则睡眠到bucket中堆顶唤醒的时间。

深度分析golang1.14定时器

1、timer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type timer struct {
	// If this timer is on a heap, which P's heap it is on.
	// puintptr rather than *p to match uintptr in the versions
	// of this struct defined in other packages.
	pp puintptr	// 计时器所在的处理器P的指针地址

	// Timer wakes up at when, and then at when+period, ... (period > 0 only)
	// each time calling f(arg, now) in the timer goroutine, so f must be
	// a well-behaved function and not block.
	when   int64		// 计时器被唤醒的时间
	period int64		// 计时器再次被唤醒的时间(周期)
	f      func(interface{}, uintptr)	// 回调函数,每次在计时器被唤醒时都会调用
	arg    interface{}	// 回调函数的参数
	seq    uintptr		// 回调函数的参数,仅在netpoll的应用场景下使用

	// What to set the when field to in timerModifiedXX status.
	nextwhen int64	// 当计时器状态为timerModifiedXX时,将会使用nextwhen设置到where字段上

	// The status field holds one of the values below.
	status uint32	// 计时器当前的状态值
}

2、p

在添加方式上,go1.14发生了变更,改为将每个timer存储在处理器p上。这也是我们之前提到的优化结构,64只能泛指大多数情况,实际都是需要p进行处理。所以go1.14里的p结构中有了timers字段。

1
2
3
4
5
6
type p struct {
    ...
    timersLock mutex
    timers []*timer
    ...
}

同样,在timers数组仍是一个最小四叉堆。

3、定时器状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Values for the timer status field.
const (
	// timer尚未设置状态
	timerNoStatus = iota

	// 等待timer启动
	timerWaiting

	// 运行timer的回调方法
	timerRunning

	// timer已经被删除,但仍然在某些p的堆中
	timerDeleted

	// timer即将被删除
	timerRemoving

	// timer已经停止,且不存在任何p的堆中
	timerRemoved

	// timer正在被修改
	timerModifying

	// timer已被修改为更早的时间,新的时间被设置在nextwhen字段中,
	timerModifiedEarlier

	// timer已被修改为更迟的时间,新的时间被设置在nextwhen字段中,
	timerModifiedLater
	
	// timer已经被修改,正在被移动
	timerMoving
)

因为涉及到p的管理,所以新增了10个timer的状态管理。

4、启动定时器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func addtimer(t *timer) {
	// when must never be negative; otherwise runtimer will overflow
	// during its delta calculation and never expire other runtime timers.
	// 边界条件判断
	if t.when < 0 {
		t.when = maxWhen
	}
	// timer的状态为timerNoStatus
	if t.status != timerNoStatus {
		throw("addtimer called with initialized timer")
	}
	t.status = timerWaiting

	when := t.when

	pp := getg().m.p.ptr()
	lock(&pp.timersLock)
	// 清除处理器p中的计时器队列,可以加快创建和删除计时器的程序的速度
	cleantimers(pp)		
	// 将当前所新创建的timer新增到p的堆中
	doaddtimer(pp, t)
	unlock(&pp.timersLock)

	// 唤醒网络轮询器中休眠的线程,检查timer被唤醒的时间(when)是否在当前轮询预期运行的时间(pollerPollUntil)内,若是则唤醒
	wakeNetPoller(when)
}

添加timer到当前的p上,这应该只在一个新创建的timer中调用,这避免了更改某些p的最小堆timerwhen字段的风险,因为这可能导致最小堆乱序。

5、停止定时器

在定时器的运行中,一般会调用timer.Stop方法来停止/删除定时器,其实就是让这个timer从处理器p的堆中移除。

  • timerWaiting/timerModifiedLater:修改timer状态为timerDeleted,删除数量+1
  • timerModifiedEarlier:修改timer状态为timerDeleted,删除数量+1,adjustTimers+1
  • timerDeleted/timerRemoving/timerRemoved:无需变更,已经满足条件
  • timerRunning/timerMoving/timerModifying:正在执行、移动中,无法停止,等待下一次状态检查再处理
  • timerNoStatus:无法停止,不满足条件

6、修改/重置定时器

在程序调度中,有些因为逻辑改变,需要重置定时器。一般会调用timer.Reset()来重设Duration值。

1
2
3
func resettimer(t *timer, when int64) {
	modtimer(t, when, t.period, t.f, t.arg, t.seq)
}

实际调用modtimer方法。

  • timerRunning/timerRemoving/timerMoving/timerModifying:等待状态改变
  • timerDeleted->timerModifying->timerModifiedXXX
  • timerNoStatus/timerRemoved->timerModifying->timerWaiting
  • timerWaiting/timerModifiedXXX->timerModifying->timerModifiedXXX

在处理完处理器的状态后,会分为两种情况进行处理:

1、待修改的定时器已经被删除:由于原定时器没有了,所以会调用doaddtimer方法创建一个定时器,并赋值原先的timer,再调用wakeNetPoller在预定的时间唤醒网络轮询器

2、正常逻辑处理:如果修改后的定时器的触发时间小于原本的触发是按,则修改定时器状态为timerModifiedEalier,并调用wakeNetPoller在预定的时间唤醒网络轮询器

7、触发定时器

前面提到过,timers已经归属到p中去了,所以定时器的触发分成两个部分:

  • 通过调度器在调度时进行定时器的触发
  • 通过系统监控检查并触发定时器(到期未执行)

1、调度器触发

调度器触发一般分为两种情况。

一种是调度循环中调用checkTimers方法进行计时器的触发

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func schedule() {
    _g_ := getg()
    ...
top:
	pp := _g_.m.p.ptr()
	pp.preempt = false
    ...
    checkTimers(pp, 0)
    ...
    execute(gp, inheritTime)
}

另一种是当前处理器p没有可执行的timer,且没有可执行的G。那么按照调度模型,就会去窃取其他定时器和G:

1
2
3
4
5
6
7
8
9
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()
    ...
top:
	_p_ := _g_.m.p.ptr()
    ...
    now, pollUntil, _ := checkTimers(_p_, 0)
    ...
}

我们来进一步分析checkTimers方法:

1、检查处理器p上是否有需要处理的timer

2、如果没有需要执行的timer,则直接返回;否则,判断标记为删除的timer数量如果小于p上的timer数量则直接返回

3、对需要处理的timer,根据时间将timers重新排序

4、在调整完timers后,调用runtimer方法真正执行timer,触发定时器

5、在最后的阶段,如果被标记为删除的timer数量如果大于p上的timer数量,则对标记为删除的timer进行清理。

2、系统监控触发

通过每次调度器调度和窃取的是否触发,还是有一定的随机性。

因此需要一个系统监控来触发定时器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func sysmon() {
	...
	for {
		...
		next, _ := timeSleepUntil()
		if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
			lock(&sched.lock)
			if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
				if next > now {
					atomic.Store(&sched.sysmonwait, 1)
					unlock(&sched.lock)
					// Make wake-up period small enough
					// for the sampling to be correct.
					sleep := forcegcperiod / 2
					if next-now < sleep {
						sleep = next - now
					}
					shouldRelax := sleep >= osRelaxMinNS
					if shouldRelax {
						osRelax(true)
					}
					notetsleep(&sched.sysmonnote, sleep)
					if shouldRelax {
						osRelax(false)
					}
					now = nanotime()
					next, _ = timeSleepUntil()
					lock(&sched.lock)
					atomic.Store(&sched.sysmonwait, 0)
					noteclear(&sched.sysmonnote)
				}
				idle = 0
				delay = 20
			}
			unlock(&sched.lock)
		}
		...
		// poll network if not polled for more than 10ms
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
			list := netpoll(0) // non-blocking - returns list of goroutines
			if !list.empty() {
				incidlelocked(-1)
				injectglist(&list)
				incidlelocked(1)
			}
		}
        if next < now {
			startm(nil, false)
		}
		...
	}
}

1、在每次系统监控时,都会在流程上调用timeSleepUntil方法去获取下一个定时器应触发的时间,以及保存改定时器已经打开的定时器堆的p.

2、检查当前是否存在GC,若正在STW则获取调度互斥锁。若发现下一个timer触发时间已经过去,则重新调用timeSleepUntil获取下一个定时器的时间和相应的p

3、如果发现超过10ms没有进行netpoll网络轮询,则主动调用netpoll方法触发轮询

8、运行定时器

这里来分析一下runtimer方法:

只有被标记为timerWaiting状态的定时器才能运行,尝试将状态更新为timerRunning,然后执行runOneTimer方法。

标记为timerDeleted状态的定时器会去删除定时器,标记为timerModifiedXXX状态的定时器会去重新添加定时器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func runOneTimer(pp *p, t *timer, now int64) {
	f := t.f
	arg := t.arg
	seq := t.seq

	if t.period > 0 {
		// ticker,需要再次触发
		// 重新计算下一次的触发时间,并且更新其在最小堆
		delta := t.when - now
		t.when += t.period * (1 + -delta/t.period)
		siftdownTimer(pp.timers, 0)
		// 将状态修改为timerWaiting
		if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
			badTimer()
		}
		// 设置p的下一次触发时间
		updateTimer0When(pp)
	} else {
		// 移除timer
		dodeltimer0(pp)
		if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
			badTimer()
		}
	}

	unlock(&pp.timersLock)

	// 回调方法
	f(arg, seq)

	lock(&pp.timersLock)
}

9、小结

通过大致的go1.14源码分析,可以看出有以下改变:

  • 在每个处理器p中,timers以最小四叉堆方式存储
  • 在调度器的每轮跳读中都会对定时器进行触发和检查
  • 在系统监听netpoll会定时进行定时器的触发和检查
  • 在定时器的处理中,10个状态的流转和处理变化

总结

go1.13最多可以开到GOMAXPROCS数量的timerproc协程,当然不超过64。但我们要知道timerproc自身就是协程,也需要runtime pmg的调度。反而go 1.14把检查到期定时任务的工作交给了runtime.schedule,不需要额外的调度,每次runtime.schedule和findrunable时直接运行到期的定时任务。

线程上下文切换开销?新添加的定时任务的到期时间更小时,不管是使用futex还是epoll_wait系统调用都会被唤醒重新休眠,被唤醒的线程会产生上下文切换。但由于go1.14没有timerproc的存在,新定时任务可直接插入或多次插入后再考虑是否休眠。

结论,golang 1.13的定时器在任务繁多时,必然会造成更多的上线文切换及runtime pmg调度,而golang 1.14做了更好的优化。