之前的文章httprouter路由框架为什么高性能提到过一点高性能的原因就是它减少了内存分配。因为分配内存是在堆上分配的,调用mallocgc函数,是有性能消耗的。
而httprouter中使用sync.pool来减少内存分配,减少GC消耗。
那么sync.Pool为什么能做到这一点?做出来哪些减少性能消耗的工作?
一、初识sync.Pool
sync.Pool是一组可以单独保存和检索的临时对象。存储在Pool中的任意对象可能会自动删除,这个删除过程是不会通知到用户的。
Pool的目的是缓存已分配但没有使用的对象以供之后复用,减轻垃圾回收的压力。而Pool是并发安全的,也就是说,它可以轻松构建高效、线程安全的存储池。
一个例子就是在fmt包中,Pool维护了一个动态大小的临时输出缓冲区存储,存储在负载的时候扩展(多goroutine打印),在静止时缩小。
二、使用方法
1
2
3
4
5
6
7
8
|
bufferpool := &sync.Pool{
New: func() interface{} {
println("Create new instance")
return struct{}{}
},
}
buffer := bufferpool.Get()
bufferpool.Put(buffer)
|
首先初始化Pool,声明一个创建Pool元素的方法。
然后当使用时申请对象,Get方法会返回Pool已经存在的对象,如果没有,就走New方法来初始化一个对象。
当使用完成后,调用Put方法把对象放回Pool中。
Pool就3个接口,针对所有的对象类型都可以使用。
那么我们来思考一个问题,使用sync.Pool有什么好处?
下面我们来看一个例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
var count int32
func newBuffer() interface{} {
atomic.AddInt32(&count, 1)
buffer := make([]byte, 1024)
return &buffer
}
func main() {
bufferPool := &sync.Pool{
New: newBuffer,
}
workers := 1024 * 1024
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buffer := bufferPool.Get()
_ = buffer.(*[]byte)
defer bufferPool.Put(buffer)
}()
}
wg.Wait()
fmt.Printf("%d buffer created", count)
}
|
最终打印结果
1
2
|
11 buffer created
10 buffer created
|
多次运行可能会出现不同的结果,但是次数count都不大。如果不是使用Pool来申请,而是直接使用buffer := make([]byte, 1024)来申请内存,那么就会申请 1024 * 1024 个对象,造成极大的浪费。而Pool就是对这类对象的复用。
三、原理
既然已经知道了复用对象的好处,那么sync.Pool到底是如何实现这一功能的呢?
首先我们来看看Pool的结构
1
2
3
4
5
6
7
8
9
10
11
|
type Pool struct {
noCopy noCopy
local unsafe.Pointer
localSize uintptr
victim unsafe.Pointer
victimSize uintptr
New func() interface{}
}
|
nocopy就是标识不能拷贝,具体原理在读写锁时提到
localSize就是cpu处理器的个数,local指向了 [localSize]poolLocal
victim和victimSize就是在GC后接管local和localSize
New函数就是初始化对象的方法
poolLocal管理Pool池里的cache元素的关键结构
1
2
3
4
5
6
7
8
|
type poolLocalInternal struct {
private interface{}
shared poolChain
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
|
private就是声明的对象
shared是双链表结构,用于挂载cache元素
pad就是用来填充字符到128字节,用于内存对齐
1、获取对象
1.1、获取本地pool
这个pin函数hold住了当前goroutine在P上,不允许调度,并且返回P的本地pool和P的id。
1
2
3
4
5
6
7
8
9
|
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
|
这个indexLocal其实就是个索引到本地pool的指针
1
2
3
4
|
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
|
通常只有第一次执行的时候,p.localSize为0,才会执行p.pinSlow,其他都直接走if返回本地pool了。
pinSlow就是把Pool注册进allPools数组中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (p *Pool) pinSlow() (*poolLocal, int) {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
atomic.StoreUintptr(&p.localSize, uintptr(size))
return &local[pid], pid
}
|
首先先解锁,然后加全局互斥锁
1
2
3
4
5
|
var (
allPoolsMu Mutex
allPools []*Pool
oldPools []*Pool
)
|
allPools就是全局的pool,oldPool就是victim使用的pool。
然后再重新hold住goroutine在P上,二次判断是否直接返回本地pool。
而使用runtime.GOMAXPROCS(0)来获取cpu的数量,也就是P的数量。
这里深入扩展一下,runtime_procPin其实是runtime包下的procPin的一层封装。
1
2
3
4
5
6
7
|
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
|
procPin的目的就是为了当前G被抢占了执行权限,也就是说G在M上不走了,而实际核心是mp.locks++,在newstack函数里,有这么段代码
1
2
3
4
5
6
7
8
9
10
|
if preempt {
if !canPreemptM(thisg.m) {
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched)
}
}
func canPreemptM(mp *m) bool {
return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
|
这里mp.locks>0,所以就只能让G一直执行
而runtime_procUnpin函数可以猜想的到,就是让mp.lock--。
1.2、取出对象
1
2
3
4
5
6
7
8
|
x := l.private
l.private = nil
if x == nil {
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
|
从private中取出对象,如果取出的对象为nil,那么就尝试从share队列中获取,如果还是nil,就从其他P的队列中取,或者从victim中取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
// 从其他P中取
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 从victim中取
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
|
1.3、初始化对象
如果上面几个地方都不存在该对象,那么就调用New函数初始化一个对象返回
1
2
3
4
|
if x == nil && p.New != nil {
x = p.New()
}
return x
|
2、放回对象
2.1、空值判断
1
2
3
|
if x == nil {
return
}
|
2.2、获取本地pool
2.3、尝试存放数据
1
2
3
4
5
6
7
8
|
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
// 放到双向链表
l.shared.pushHead(x)
}
|
3、victim
看到这里,可能就有点疑惑了,Pool就这两个方法,也没有用到victim。
这个奥秘就在于它注册了init函数,在每次GC的时候调用poolCleanup函数,也就是说每一轮GC都会对所有的Pool做清理工作。
1
2
3
|
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
|
而poolCleanup函数就是做pool的迁移
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func poolCleanup() {
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
|
四、总结
1、sync.Pool是并发安全的,读取数据时会hold住当前的goroutine不被打断
2、sync.Pool不允许复制后使用,因为nocopy
3、sync.Pool不适用于socket长连接或连接池等,因为无法知道连接池的个数,连接池的元素随时可能会被释放。
4、从sync.Pool取出的对象使用完需要放回池中。