channel 101

1. 引子

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

2. 使用

channel 底层原理

  • buf是有缓冲的channel所特有的结构,用来存储缓存数据。是个循环链表 (为啥是循环链表?普通数组不行吗,普通数组地址和容量固定更适合指定的空间。需要pop 掉元素,普通数组需要全部都前移)
  • sendxrecvx用于记录buf这个循环链表中的发送或者接收的index
  • lock是个互斥锁。
  • recvqsendq分别是接收(<-channel)或者发送(channel <- xxx)的goroutine抽象出来的结构体(sudog)的队列。是个双向链表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
qcount uint // 队列中的总元素个数
dataqsiz uint // 环形队列大小,即可存放元素的个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 //每个元素的大小
closed uint32 //标识关闭状态
elemtype *_type // 元素类型
sendx uint // 发送索引,元素写入时存放到队列中的位置
recvx uint // 接收索引,元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁,chan不允许并发读写
}

1. 读写流程

向 channel 写数据:

  1. 若等待接收队列 recvq 不为空,则缓冲区中无数据或无缓冲区,将直接从 recvq 取出 G ,并把数据写入,最后把该 G 唤醒,结束发送过程。

  2. 若缓冲区中有空余位置,则将数据写入缓冲区,结束发送过程。

  3. 若缓冲区中没有空余位置,则将发送数据写入 G,将当前 G 加入 sendq ,进入睡眠,等待被读 goroutine 唤醒。

从 channel 读数据

  1. 若等待发送队列 sendq 不为空,且没有缓冲区,直接从 sendq 中取出 G ,把 G 中数据读出,最后把 G 唤醒,结束读取过程。

  2. 如果等待发送队列 sendq 不为空,说明缓冲区已满,从缓冲区中首部读出数据,把 G 中数据写入缓冲区尾部,把 G 唤醒,结束读取过程。

  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程。

  4. 将当前 goroutine 加入 recvq ,进入睡眠,等待被写 goroutine 唤醒。

关闭 channel

1.关闭 channel 时会将 recvq 中的 G 全部唤醒,本该写入 G 的数据位置为 nil。将 sendq 中的 G 全部唤醒,但是这些 G 会 panic。

panic 出现的场景还有:

  • 关闭值为 nil 的 channel
  • 关闭已经关闭的 channel
  • 向已经关闭的 channel 中写数据

2. 创建channel

1
2
3
4
// 无缓冲的channel由于没有缓冲发送和接收需要同步
ch := make(chan int)
//有缓冲channel不要求发送和接收操作同步
ch := make(chan int, 2)

实例化了 chan 的结构体,返回ch指针

channel中发送send(ch <- xxx)和recv(<- ch)接收

使用 mutex 加锁操作,新进先出的队列

当channel缓存满后

发送满的时候

当队列已满的时候,G1正在运行,当再次send操作时,会主动调用GO的调度器,让G1等待,并让出M,同时G1也会被抽象成含有G1指针和send元素的sudog结构体保存到hchan的sendq中等待被唤醒。

当G2 recv操作的时候,G2从缓存队列中取出数据,channel会将等待队列中的G1推出,将G1当时send的数据推到缓存中,然后调用Go的scheduler,唤醒G1,并把G1放到可运行的Goroutine队列中。

接收满的时候

这个时候G2会主动调用Go的调度器,让G2等待,并从让出M,让其他G去使用。 G2还会被抽象成含有G2指针和recv空元素的sudog结构体保存到hchan的recvq中等待被唤醒。此时恰好有个goroutine G1开始向channel中推送数据 ch <- 1。 此时,非常有意思的事情发生了:G1并没有锁住channel,然后将数据放到缓存中,而是直接把数据从G1直接copy到了G2的栈中。 这种方式非常的赞!在唤醒过程中,G2无需再获得channel的锁,然后从缓存中取数据。减少了内存的copy,提高了效率。

4. channel 的状态和操作方式

如下:

操作 nil 的 channel 已关闭的 channel 正常 channel
close 关闭 panic panic 成功
ch <- 写 死锁 panic 阻塞或成功
<-ch 读 死锁 零值 阻塞或成功

channel一个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语言层面提供的goroutine间的通信方式。

众所周知,Go依赖于称为CSP(Communicating Sequential Processes)的并发模型,通过Channel实现这种同步模式。Go并发的核心哲学是不要通过共享内存进行通信; 相反,通过沟通分享记忆。

channel 常见用法

循环读取

1
2
3
for x := range ch {
fmt.Println(x)
}