目录

并发编程之原子操作

原子操作

我们先给原子操作下一个定义:

原子(atom):在化学反应中不可再分的基本微粒。

原子操作(atomic operation):不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何的上下文切换。

简单来说,就是多个线程对同一块内存的操作是串行的,不会因为并发操作而同时读写内存。

原子性

在处理器层面,基于缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。通过加锁保证从系统内存中读取或写入一个字节是原子的,也就是当一个处理器读取一个字节时,其他处理器不能访问这个字节的内存地址。

  • 总线锁

如果多个处理器同时对共享变量进行读写操作(i++),那么共享变量就会被多个处理器同时进行操作,这样读写操作就不是原子的,操作完之后共享变量的值会和期望的不一致。

总线锁其实就是处理器提供一个LOCK#信号,当一个处理器在总线上输出信号时,其他处理器的请求将被阻塞,那么改处理器就能独占共享内存。

在同一时刻,只需保证对某个内存地址的操作是原子性即可,但总线锁把CPU和内存之间的通信锁住了,使得其他处理器不能操作其他内存地址的数据,所以总线锁的开销比较大,缓存锁可以在某些场合代替总线锁进行优化。

  • 缓存锁

内存区域如果被缓存在处理器的缓存行中,并且在LOCK#操作期间,那么当它执行操作回写到内存时,处理器不能在总线上声明LOCK#信号,而是修改内部的内存地址,允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性会阻止同时修改两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行数据时,会使缓存行无效。

但有两种情况下处理器不会使用缓存锁定:

1、当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line),则处理器会调用总线锁

2、有些处理器不支持缓存锁定。

锁机制虽然能保证原子性,但是锁机制最主要的问题:多线程竞争的情况下,会出现线程阻塞和唤醒锁带来的性能问题,互斥同步(阻塞同步)。

锁机制采用的是悲观锁策略,并不是一种特别高效的解决方案。可以采用乐观锁,每次不加锁,而是假设没有冲突去完成某项操作,如果有冲突就重试,知道成功为止。这就是无锁操作CAS(Compare and swap)。

CAS

CAS是一条原子指令,CAS(V,O,N),包含三个值分别为:V内存地址存放的实际值,O预期的值(旧值),N更新的值,作用是让CPU先比较旧值O和内存实际值V,如果相等就表明没有被其他线程修改过,就会把新值N赋值给V。反之,V和O不相等,不能把N赋值给V,返回V即可。

伪代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func CompareAndSwap(addr *int, oldValue,newValue int) bool {
    if addr == nil {
        return false
    }
    if *addr == oldValue {
        *addr = newValue
        return true
    }
    return false
}

不过上面的代码可能会发生一个问题,也就是ABA问题。因为CAS需要在操作值的时候检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变回了A,那么使用CAS检查时会发现它的值没有发生变化,但实际上发生了变化。ABA问题的解决思路就是使用版本号,在遍历前面追加版本号,每次更新的时候都会把版本号加1,那么A-B-A就会变成1A-2B-3A

go包中的原子操作

src/sync/atomic/doc.go下,把底层硬件提供的原子操作封装成了Go的函数,分为5个系列:

1、SwapXXX(addr *int32, new int32) (old int32):原子性的将new的值保存到*addr并返回旧值

2、CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool):原子性比较*addrold的值,如果相同则将new赋值给*addr并返回true

3、AddInt32(addr *int32, delta int32) (new int32):原子性的将delta的值加到*addr并返回新值

4、LoadInt32(addr *int32) (val int32):原子性的获取*addr的值

5、StoreInt32(addr *int32, val int32):原子性的将val的值保存到*addr

源码解析

原子操作是基于汇编实现的,基于plan9的。


我们可以看一下value.go文件的源码。

1
2
3
type Value struct {
	v interface{}
}

虽然这里是interface类型,但是这里其实是分解了类型和值的。

1
2
3
4
type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}

Value的写入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	vp := (*ifaceWords)(unsafe.Pointer(v))
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	for {
		typ := LoadPointer(&vp.typ)
		if typ == nil {
			// Attempt to start first store.
			// Disable preemption so that other goroutines can use
			// active spin wait to wait for completion; and so that
			// GC does not see the fake type accidentally.
			runtime_procPin()
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin()
				continue
			}
			// Complete first store.
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin()
			return
		}
		if uintptr(typ) == ^uintptr(0) {
			// First store in progress. Wait.
			// Since we disable preemption around the first store,
			// we can wait with active spinning.
			continue
		}
		// First store completed. Check type and overwrite data.
		if typ != xp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
		StorePointer(&vp.data, xp.data)
		return
	}
}

// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()

通过报错信息和注释我们知道,存入的值不能为nil,类型必须与原类型相同。

写入步骤:

1、判断写入值不能为nil,否则触发panic

2、将oldValuenewValue转换成ifaceWords类型,方便获取类型和值

3、为了保证原子性,循环处理,当已经有Store正在写入时,会进行等待。

4、如果还没有写入数据,类型为空,那么会开始第一次写入操作,会先调用runtime_procPin方法禁止调度器对当前goroutine的抢占

5、调用CAS方法来判断当前地址是否有被抢占,如果失败,就会解除抢占锁,解除禁止调度器,循环等待

6、设置中间值成功后,可以安全的把v设为传入的新值了,写入值和类型。

7、第一次写入没有完成,通过uintptr(typ) == ^uintptr(0)来判断,因为还是第一次放入的中间类型,会继续等待第一次完成

8、如果第一次写入完成,会检查类型是否一致,然后写入数据

Value的读取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (v *Value) Load() (x interface{}) {
	vp := (*ifaceWords)(unsafe.Pointer(v))
	typ := LoadPointer(&vp.typ)
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		// First store not yet completed.
		return nil
	}
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}

先转换oldValue,然后根据类型判断是否有数据或第一次写入有没有完成,通过检查后,获取值。

总结

golang包中的原子操作可以看成是乐观锁,而互斥锁可以看成是悲观锁。

原子锁操作更加轻量,可以在不形成临界区和创建互斥量的情况下并发安全的值替换操作,可以大大减少同步对程序性能的损耗。

参考资料

原子操作