目录

并发编程之WaitGroup

目录

waitgroup的使用场景:

一个waitgroup对象可以等到一组协程结束,也就是等待一组goroutine返回。

首先我们来看看sync.WaitGroup的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
	noCopy noCopy

	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers do not ensure it. So we allocate 12 bytes and then use
	// the aligned 8 bytes in them as state, and the other 4 as storage
	// for the sema.
	// 64bit(8bytes)的值分成两段,高32位是计数值,低32位是waiter的计数
	// 64位值的原子操作需要在64位编译器上,但32位不支持,所以使用12bytes,8bytes表示状态,4bytes表示信号量
	state1 [3]uint32
}

这里总共就2个字段,一个是nocopy是为了保证该结构不会被拷贝,是一种保护机制。一个是state1主要是存储状态和信号量,这里使用的8字节对齐。

这里的state1一共被分配了12个字节,被设定成3种状态:

  • 其中对齐的8个字节作为状态,高32位为计数的数量,低32位为等待的goroutine数量
  • 其中的4个字节作为信号量存储
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		// 如果是64位,数组前两个元素做state,后一个元素做信号量
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		// 如果是32位,数组后两个元素做state,第一个元素做信号量
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}

为了保证waitgroup在32位平台上使用,就不能分成2个字段来写,考虑到字段的顺序平台的不同,内存对齐的方式不同,因此这里判断数组的首地址是否处于8字节对齐的位置上。

当数组的首地址是处于一个8字节对齐的位置上时,那么就将这个数组的前8个字节作为64位值使用表示状态,后4个字节作为32位值表示信号量(semaphore)。同理如果首地址没有处于8字节对齐的位置上时,那么就将前4个字节作为semaphore,后8个字节作为64位数值。画个图表示一下:

https://cdn.jsdelivr.net/gh/betterfor/cloudImage/images/2021/07/16/1085709093.png

waitgroup提供了Add方法增加一个计数器,Done方法减掉一个计数器,而Done方法实际上就是Add(-1)

所以我们来看看Add()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Add主要操作state的计数值部分(高32位)
func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()
	// 将delta添加到计数值上
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	// 当前的计数值
	v := int32(state >> 32)
	// 当前的waiter
	w := uint32(state)
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	// Add(1)必须在Wait之前
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 {
		return
	}
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	// 如果计数值v=0且waiter>0,那么state的值就是waiter的数量
	// 将waiter设置为0,唤醒所有的waiter
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

Wait方法会阻塞goroutine知道waitgroup的计数器变为0。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 不断检查state的值,如果其中计数值为0,说明所有的子goroutine已全部执行完毕,调用者不必等待,全部返回。
// 如果计数值>0,说明此时还有任务没有完成,那么调用者变成等待者,需要加入wait队列,并且阻塞自己
func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32) // 计数值(Add)
		w := uint32(state)      // waiter
		if v == 0 {
			// Counter is 0, no need to wait.
			return
		}
		// Increment waiters count.
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			runtime_Semacquire(semap)
			// 阻塞完成,所有的Add已经完成
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			return
		}
	}
}

小结

  • Add方法与Wait方法不可以并发同时调用,Add方法必须在Wait方法之前调用
  • Add方法必须与实际等待的goroutine数量一致,否则会panic
  • 调用Wait方法后,必须等待Wait方法返回后才能重新使用waitgroup,也就是不能再wait没有返回前调用Add
  • waitgroup对象只能有一份,不可以拷贝给其他变量。