目录

并发编程之读写锁

读写锁是基于互斥锁Mutex实现的读写互斥锁,一个goroutine可以持有多个读锁或一个写锁,同一时刻只能同时持有读锁或写锁。

RWMutex结构体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type RWMutex struct {
	w Mutex // held if there are pending writers	// 互斥锁
	// 写和读锁信号
	writerSem uint32 // semaphore for writers to wait for completing readers
	readerSem uint32 // semaphore for readers to wait for completing writers
	// 读锁计数器
	readerCount int32 // number of pending readers
	// 获取写锁是需要等待的读锁释放数量
	readerWait int32 // number of departing readers
}

const rwmutexMaxReaders = 1 << 30 // 最多支持2^30个锁

读加锁RLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 读加锁,不应该用于递归读锁定
func (rw *RWMutex) RLock() {
	// 每次goroutine获取读锁时,readerCount+1
	// 如果写锁已经被获取,那么readerCount在[-rwmutexMaxReaders,0),这时挂起读锁的goroutine
	// 如果写锁没有被获取,那么readerCount>=0,获取读锁,不阻塞
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 将goroutine排到队列尾部,挂起goroutine,监听readerSem信号量
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

读加锁时并没有用到互斥锁,而是readerCount+1,而小于0的情况是因为写加锁了

读解锁RUnlock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 读解锁,只会撤销对应的RLock调用,不会影响其他读锁
func (rw *RWMutex) RUnlock() {
	// 读计数器readerCount-1
	// 场景一:有读锁,没有写锁被挂起 r=readerCount-1>=0
	// 场景二:有读锁,有写锁被挂起 r<0
	// 场景三:没有读锁,没有写锁被挂起 r=-1
	// 场景四:没有读锁,有写锁被挂起 r=-(1<<30)-1<0
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	// 场景三和场景四是异常情况
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// 如果写锁的goroutine被阻塞,需要将读锁的goroutine全部释放,才会唤醒写锁
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

因为写加锁时是把readerCount+-rwmutexMaxReaders,所以在已经有写加锁时,readerCount一定是负数。

而对没有读锁没有写锁这种异常情况,readerCount-1后为-1

而对没有读锁,有写锁时,此时readerCount+1=-rwmutexMaxReaders

这两种情况都会报错。

对于有读锁,有写锁时,如果是最后一个读解锁,那么唤醒写锁。

写加锁Lock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (rw *RWMutex) Lock() {
	// 首先,获取互斥锁,与其他来获取写锁的goroutine互斥
	rw.w.Lock()
	// 告诉其他来获取读锁的goroutine,现在已经有人获取了写锁
	// 减去最大读锁的数量,用负数来表示,写锁已经被获取
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// 设置需要等待释放的读锁数量,如果有挂起写锁
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

这里先用互斥锁保护下面的操作:

设置读等待的数量readerWait,如果不为0,就挂起写锁

写解锁Unlock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (rw *RWMutex) Unlock() {
	// 向读锁的goroutine发出通知,没有写锁了
	// 还原加锁时的readerCount
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
}

写解锁时,首先将readerCount数量还原,然后去唤醒读锁

总结

源码中可以看到,读写锁首先内置了一个互斥锁,再加上各种计数器来实现读写锁。

  • 读锁不能阻塞读锁,所以添加readerCount
  • 读锁需要阻塞写锁,直到所有的读锁释放,引入writerSem
  • 写锁需要阻塞读锁,直到所有写锁释放,引入readerSem
  • 写锁需要阻塞写锁,引入mutex实现