读写锁是基于互斥锁Mutex
实现的读写互斥锁,一个goroutine可以持有多个读锁或一个写锁,同一时刻只能同时持有读锁或写锁。
RWMutex结构体
1
2
3
4
5
6
7
8
9
10
11
12
|
type RWMutex struct {
w Mutex // held if there are pending writers // 互斥锁
// 写和读锁信号
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
// 读锁计数器
readerCount int32 // number of pending readers
// 获取写锁是需要等待的读锁释放数量
readerWait int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30 // 最多支持2^30个锁
|
读加锁RLock
1
2
3
4
5
6
7
8
9
10
|
// 读加锁,不应该用于递归读锁定
func (rw *RWMutex) RLock() {
// 每次goroutine获取读锁时,readerCount+1
// 如果写锁已经被获取,那么readerCount在[-rwmutexMaxReaders,0),这时挂起读锁的goroutine
// 如果写锁没有被获取,那么readerCount>=0,获取读锁,不阻塞
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 将goroutine排到队列尾部,挂起goroutine,监听readerSem信号量
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
|
读加锁时并没有用到互斥锁,而是readerCount
+1,而小于0的情况是因为写加锁了
读解锁RUnlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 读解锁,只会撤销对应的RLock调用,不会影响其他读锁
func (rw *RWMutex) RUnlock() {
// 读计数器readerCount-1
// 场景一:有读锁,没有写锁被挂起 r=readerCount-1>=0
// 场景二:有读锁,有写锁被挂起 r<0
// 场景三:没有读锁,没有写锁被挂起 r=-1
// 场景四:没有读锁,有写锁被挂起 r=-(1<<30)-1<0
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// 场景三和场景四是异常情况
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 如果写锁的goroutine被阻塞,需要将读锁的goroutine全部释放,才会唤醒写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
|
因为写加锁时是把readerCount
+-rwmutexMaxReaders
,所以在已经有写加锁时,readerCount
一定是负数。
而对没有读锁没有写锁这种异常情况,readerCount
-1后为-1
而对没有读锁,有写锁时,此时readerCount
+1=-rwmutexMaxReaders
这两种情况都会报错。
对于有读锁,有写锁时,如果是最后一个读解锁,那么唤醒写锁。
写加锁Lock
1
2
3
4
5
6
7
8
9
10
11
|
func (rw *RWMutex) Lock() {
// 首先,获取互斥锁,与其他来获取写锁的goroutine互斥
rw.w.Lock()
// 告诉其他来获取读锁的goroutine,现在已经有人获取了写锁
// 减去最大读锁的数量,用负数来表示,写锁已经被获取
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 设置需要等待释放的读锁数量,如果有挂起写锁
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
|
这里先用互斥锁保护下面的操作:
设置读等待的数量readerWait
,如果不为0,就挂起写锁
写解锁Unlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (rw *RWMutex) Unlock() {
// 向读锁的goroutine发出通知,没有写锁了
// 还原加锁时的readerCount
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
|
写解锁时,首先将readerCount
数量还原,然后去唤醒读锁
总结
源码中可以看到,读写锁首先内置了一个互斥锁,再加上各种计数器来实现读写锁。
- 读锁不能阻塞读锁,所以添加readerCount
- 读锁需要阻塞写锁,直到所有的读锁释放,引入writerSem
- 写锁需要阻塞读锁,直到所有写锁释放,引入readerSem
- 写锁需要阻塞写锁,引入mutex实现