目录

sync之RWMutex

在go中,每次读写时都需要加互斥锁,这个对程序的影响还是比较大的。所以我们在sync包中能够找到另外一个锁—读写锁。当然,读写锁适用于读次数远远多于写次数的场景。

那么读写锁和互斥锁有什么联系和不同呢?

一、数据结构

1
2
3
4
5
6
7
type RWMutex struct {
	w Mutex
	writerSem uint32 
	readerSem uint32 
	readerCount int32 
	readerWait int32 
}

可以看到w是互斥锁,writerSemreaderSem都是等待者,readerCount是读计数器,readerWait是获取写锁需要等待的读锁释放数量。

而最多支持rwmutexMaxReaders(2^30^个读计数器)

二、整体流程

这里读写锁做了个很精妙的方法区分读写锁,如果有写锁进来,将readerCount-wmutexMaxReaders,因为readerCount最大数量是小于wmutexMaxReaders,所以在加锁结果过程中,如果发现readerCount<0,那么就知道有写锁加进来了。

1、读加锁

每次goroutine获取读锁时,readerCount+1,然后分两种情况:

  • 如果写锁已经被获取,那么readerCount在区间[-rwmutexMaxReaders,0),此时挂起读锁的goroutine
  • 如果写锁没有被获取,那么readerCount>=0,直接获取。
1
2
3
4
5
6
func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 将goroutine排到队列尾部,挂起goroutine,监听readerSem信号量
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

2、读解锁

读解锁只会撤销对应的RLock调用,不会影响其他读锁

readerCount-1,此时分为以下几种情况:

  • 有读锁,没有写锁被挂起,r = readerCount-1>=0
  • 有读锁,有写锁被挂起,r < 0
  • 没有读锁,没有写锁被挂起,r=-1
  • 没有读锁,有写锁被挂起,r=-(1«30)-1<0

如果r>0,那么直接解锁,而对于r<0的情况,第三种和第四种是异常情况,不能用RUnlock解写锁,只能将readerCount-1,并唤醒等待的读锁,只有将所有读锁的goroutine全部释放,才会唤醒写锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		throw("sync: RUnlock of unlocked RWMutex")
	}
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

3、写加锁

写操作是互斥的,所以写操作是需要添加互斥锁,然后通知其他读锁,如果有读锁,就挂起写锁

1
2
3
4
5
6
7
func (rw *RWMutex) Lock() {
	rw.w.Lock()
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

4、写解锁

同样,写解锁向读锁发出通知,还原加锁的readerCount

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (rw *RWMutex) Unlock() {
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		throw("sync: Unlock of unlocked RWMutex")
	}
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	rw.w.Unlock()
}

三、nocopy

因为是需要加锁解锁操作,所以在goroutine中是不能使用拷贝的。注释中也明确指定了这一点:

A RWMutex must not be copied after first use.

如果我们在代码中使用会出现异常情况。

如果结构体对象包含指针字段,当该对象被拷贝时,会使得两个对象中的指针字段变得不再安全。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type S struct {
	f1 int
	f2 *s
}

type s struct {
	name string
}

func main() {
	mOld := S{
		f1: 0,
		f2: &s{name: "mike"},
	}
	mNew := mOld //拷贝
	mNew.f1 = 1
	mNew.f2.name = "jane"

	fmt.Println(mOld.f1, mOld.f2) //输出:0 &{jane}
}

这时修改mNew的字段值会把mOld字段值修改掉,这就可能会引发安全问题。

1、copy检查

1
2
3
4
5
6
func main() {
	var a strings.Builder
	a.Write([]byte("a"))
	b := a
	b.Write([]byte("b"))
}

这段代码运行会报错

1
panic: strings: illegal use of non-zero Builder copied by value

这时因为它在内部实现了copyCheck方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Do not copy a non-zero Builder.
type Builder struct {
	addr *Builder // of receiver, to detect copies by value
	buf  []byte
}

func (b *Builder) Write(p []byte) (int, error) {
	b.copyCheck()
	b.buf = append(b.buf, p...)
	return len(p), nil
}

func (b *Builder) copyCheck() {
	if b.addr == nil {
		b.addr = (*Builder)(noescape(unsafe.Pointer(b)))
	} else if b.addr != b {
		panic("strings: illegal use of non-zero Builder copied by value")
	}
}

实现了逻辑也比较简单,b.addr指向了自身的指针,如果将a赋值给b,那么ab本身是不同的对象,因此b.addr实际会指向a导致panic。

这里的noescape里面就有关于逃逸分析的内容

2、nocopy

那有没有更简单的方式呢?有,就是互斥锁的接口

1
2
3
type noCopy struct{}
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

sync包中都存在nocopy检查,通过go vet进行copy检查,都是添加这种类型。

也就是说,我们在代码中也使用这种方式,可以进行nocopy检查。

四、总结

  • 读锁不能阻塞读锁,所以会添加readerCount
  • 读锁能够阻塞写锁,直到所有的读锁释放,所以引入writerSem
  • 写锁要能够阻塞读锁,直到所有写锁释放,所以引入readerSem
  • 写锁需要能够阻塞写锁,所以使用互斥锁
  • 可以通过LockUnlock方式,实现nocopy