golang有一个很重要的特性就是channel,经常配合goroutine一起使用。
一、基本用法
| 1
2
 | x := <- ch
x,ok := <- ch
 | 
 
当然,其中也涉及到有缓冲和无缓冲的情况,为什么会造成这种情况,我们会在下面解释。
二、数据结构
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
 | type hchan struct {
	qcount   uint           // 队列中数据的个数
	dataqsiz uint           // 队列容量
	buf      unsafe.Pointer // 存放在环形数组的数据
	elemsize uint16         // channel中数据类型大小
	closed   uint32         // channel是否关闭
	elemtype *_type         // 元素类型
	sendx    uint           // send的数组索引
	recvx    uint           // receive的数组索引
	recvq    waitq          // <-ch 阻塞在chan上的队列 list of recv waiters
	sendq    waitq          // ch<- 阻塞在chan上的队列 list of send waiters
	lock mutex
}
 | 
 
channel的数据结构不太复杂,就是一个环形队列,里面保存了长度qcount,容量dataqsiz,数据buf,以及前后索引sendx,recvx。
closed用来标识channel的状态,0表示未关闭,非0表示已关闭,如果关闭,那么就不能发送数据。
三、初始化
在内部有两个make函数,一个是makechan64,一个是makechan,其实makechan64本质上还是调用的makechan。
1、长度判断
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 | elem := t.elem
if elem.size >= 1<<16 {
	throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
	throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
	panic(plainError("makechan: size out of range"))
}
 | 
 
初始化的时候可以传入长度size,然后根据你初始化数据的类型大小elem.size计算是否有可用空间。
2、分配内存
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 | var c *hchan
switch {
case mem == 0:
	c = (*hchan)(mallocgc(hchanSize, nil, true))
	c.buf = c.raceaddr()
case elem.ptrdata == 0:
	c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
	c.buf = add(unsafe.Pointer(c), hchanSize)
default:
	c = new(hchan)
	c.buf = mallocgc(mem, elem, true)
}
 | 
 
- 如果size为0,只分配hchanSize的大小,如果是64位就是80,如果是32位就是40
- 如果数据类型不是指针,分配一块连续内存hchanSize+mem
- 如果数据类型是指针,hchan和buf单独分配
此时,将结构体剩余字段赋值。
| 1
2
3
 | c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
 | 
 
四、send
就是ch <- x
调用的函数签名是chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
1、空chan判断
首先判断channel是否初始化
| 1
2
3
4
5
6
7
 | if c == nil {
	if !block {
		return false
	}
	gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
	throw("unreachable")
}
 | 
 
其次判断channel是否关闭
| 1
2
3
4
 | if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
	(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
	return false
}
 | 
 
这段判断逻辑还是比较复杂的。
这个是fast path,向没有阻塞的管道判断发送失败,这样就可以不用获取锁进行判断了。
如果是非阻塞,管道没有关闭的情况下,没有缓冲区或缓冲区已经满了,返回false。
由于这里是并发执行的,可能会在判断完c.closed==0之后,关闭channel,那么这里会出现这两种情况:
1、channel没有关闭,没有缓冲区或缓冲区已经满了,返回false
2、channel已经关闭,close会加锁将recvq和sendq全部出队列,返回false
所以这里的判断是十分严谨的。
2、加锁
| 1
2
3
4
5
6
 | lock(&c.lock)
if c.closed != 0 {
	unlock(&c.lock)
	panic(plainError("send on closed channel"))
}
 | 
 
3、取出接受者
| 1
2
3
4
 | if sg := c.recvq.dequeue(); sg != nil {
	send(c, sg, ep, func() { unlock(&c.lock) }, 3)
	return true
}
 | 
 
找到一个等待的接受者,直接发送
4、是否有缓冲
如果没有找到有等待的接受者,那么就看channel是否是有缓冲的。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 | if c.qcount < c.dataqsiz {
	// Space is available in the channel buffer. Enqueue the element to send.
	qp := chanbuf(c, c.sendx)
	typedmemmove(c.elemtype, qp, ep)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	unlock(&c.lock)
	return true
}
 | 
 
可以看到环形队列的判断
如果到这里,前面的步骤都没有发送成功,表示没有接受者等待,也没有缓冲区,那么就需要挂起goroutine等待接受者了。
| 1
2
3
4
 | if !block {
	unlock(&c.lock)
	return false
}
 | 
 
5、阻塞goroutine
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
 | gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
	mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
 | 
 
添加发送者到发送队列,调用gopark阻塞
6、发送完成
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 | if mysg != gp.waiting {
	throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
	if c.closed == 0 {
		throw("chansend: spurious wakeup")
	}
	panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
	blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
 | 
 
goroutine被唤醒后,表示发送完成,清理现场
五、recv
| 1
2
3
 | func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}
 | 
 
对应的是x <- ch
| 1
2
3
4
 | func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}
 | 
 
对应的是x,ok :=<- ch 
1、空channel判断
| 1
2
3
4
5
6
7
 | if c == nil {
	if !block {
		return
	}
	gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
	throw("unreachable")
}
 | 
 
同样这里也有个fast path
| 1
2
3
4
5
 | if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
	c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
	atomic.Load(&c.closed) == 0 {
	return
}
 | 
 
这里把是否关闭放在了最后进行判断,与发送不一样,这是因为接受的时候会走default分支
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 | c := make(chan int, 1)
c <- 1
go func() {
    select {
    case <- c:
        println("receive from c")
    default:
        println("c is not ready")
    }
}
close(c)
 | 
 
这段代码应该不执行default分支。
可能会出现如下情况:
- select发生在close之前,从c中取出1
- select发送在close之后,但在<-c 之前,取出1
- select发送在<-c之后,取出0,received=false,但不会执行default
如果这里我们把c.closed的判断放在前面的话,会出现以下情况:
- 通道未关闭,不存在可接受数据,没有发送者等待,返回(false, false)
- 通道已关闭,不存在可接受数据,没有发送者等待,应该要返回(ture, false),这里返回了(false, false)
这里,selected应该为true,所以把c.closed放在最后判断可以避免这种情况。
2、通道关闭
| 1
2
3
4
5
6
7
8
9
 | lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
	unlock(&c.lock)
	if ep != nil {
		typedmemclr(c.elemtype, ep)
	}
	return true, false
}
 | 
 
如果通道已经关闭并且没有数据可以读取,返回(true,false)
3、取出发送者
| 1
2
3
4
 | if sg := c.sendq.dequeue(); sg != nil {
	recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
	return true, true
}
 | 
 
找到一个发送者,接受数据
4、是否有缓冲
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
 | if c.qcount > 0 {
	qp := chanbuf(c, c.recvx)
	if ep != nil {
		typedmemmove(c.elemtype, ep, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	unlock(&c.lock)
	return true, true
}
 | 
 
5、阻塞goroutine
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 | gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
	mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
 | 
 
6、接受完成
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 | if mysg != gp.waiting {
	throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
	blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
 | 
 
六、关闭channel
这部分比较简单,就是加锁,设置标识位c.closed,然后唤醒所有的接受者和发送者,接受和发送数据,最后释放锁。
七、总结
1、channel底层就是一个环形队列
2、在有接受者或发送者的情况下,在select中不会走default分支
3、初始化的缓冲就是对应的是否阻塞block