sync.Map

golang 中原生的 map 本身是线程不安全的,主要表现在以下方面:

  1. 对于 map 结构的并发访问,会报错 panic。
  2. 即使通过某种方式不同时写一个 key,在 map 扩容的时候也会有问题,因为 map 的扩容是线程不安全的

sync.Map 相对 map 的优势:

  1. 一写多读
  2. 空间换时间 = read 和 dirty
  3. double-checking
  4. 缓存思路
  5. 延时删除

关于 dirty 的提升

Map 中维持了一个 int 类型的 misses 每当 Map 未命中 read 时,会将该值自增 1, 当该值大于 dirty 的长度后,dirty 就会被提升为 read,提升之后,dirty 和 misses 会被重置,等下一次插入新值时,会将 read 中未删除的数据复制到 dirty 中。
除此之外,执行 Range 时,也会先进行一次提升。

关于延迟删除

当执行 Delete 时,如果 read 没有击中, 就会直接从 dirty 中删除,否则如果键值在 read 中,会先将其 Value 的指针(enter.p)标记为 nil, 等下一次执行复制时,这些被标记为 nil 的键值会被重新标记为 expunged,即 enter.p 有三种可能的值:

  1. nil: 表示 键值已经被删除,但这一版的 read 还没有被复制到 dirty 中,所以 dirty 此时为 nil, 遇到要重新插入这个key时,可以直接修改 read,之后进行复制时,这个最新的值会被同步回 dirty。
  2. expunged: 表示该键值已经被删除并且经历了复制, dirty 不为 nil, 这时需要同时修改 read 和 dirty, 避免 read 的数据比 dirty 中的数据新,导致下一次提升时丢失新数据。
  3. != nil: 表示存储的是具体的 value 的指针。

sync.Map 具有以下结构和方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
type Map struct { 
mu sync.Mutex // 互斥锁
read atomic.Value // 原子化只读操作
dirty map[interface{}]*entry // 新增的 kv 会存储到这里
misses int // 当 read 中查询失败时,misses+=1,当 miss 的数量超过 dirty 时,会触发将 dirty 转化到 read
}

// 从 Map 中取出一个 value
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
// 优先从 read 中读取
e, ok := read.m[key]
// 当读不到且 amended = true 时,再互斥的从 dirty 中查找
if !ok && read.amended {
m.mu.Lock()
// double-checking, 避免在加锁过程中有其他 goroutine 将 dirty 提升为 read 的情况
read, _ = m.read.Load().(readOnly) e, ok = read.m[key]
// 双重查找后还找不到,再去 dirty 中查找,同时 misses += 1
if !ok && read.amended {
e, ok = m.dirty[key] // 修改 misses,尝试提升 dirty
m.missLocked()
}
m.mu.Unlock()
}
if !ok { return nil, false }
// 需要注意这里取到的是指向真实 value 的指针,还需要读取出真正的 value
return e.load()
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// 将 dirty 提升为 read
m.read.Store(readOnly{m: m.dirty})
// 重置相关字段
m.dirty = nil
m.misses = 0
}

// 向 Map 中 存入一个 KV 对
func (m *Map) Store(key, value interface{}) {
// 优先从 read 中读取
// 如果 read 中找到了,利用 entry 的 tryStore 方法更新 value
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

// 加锁, 二次检查
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// 当对已经存在于 `read` 中的键值对执行删除操作时,而是会把其暂时标记为 `nil`,
// 等 dirty 升级为 read 后再插入新值时会把 read 中标记为 `nil` 的值标记为 `expunged`,
// 而其他的值会被重新复制到 dirty 中,当这时插入刚被删除的键后,就会直接把之前标记为 `expunged` 的键的值赋为新值
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果 dirty 中找到了,通过 storeLocked 修改 dirty 中的 entry
e.storeLocked(&value)
} else {
// dirty 中也没有,那么就直接插入到 dirty 中
if !read.amended {
// 此时如果 amended == false,对应两种情况
// 1. 第一次插入数据,read 和 dirty 都为空,此时需要对 read 进行初始化
// 2. 之前的 dirty 刚刚升级成为 read,此时 dirty 为空,需要将之前有可能存在的已标记删除的进行惰性删除
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// 如果 Map 中存在 key,覆盖并返回 (旧值, true), 否则返回 (新值, false)
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// 从 Map 中删除一个 KV 对
func (m *Map) Delete(key interface{}) {
// 二次检查 read
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 不存在,就从 dirty 中删除
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
// 存在的话就调用 entry 的 delete() 从 read 中删除
if ok { e.delete() }
}

// 对 Map 中的所有 KV 执行 f, 直到 f 返回 false
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {
// double-checking
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
// 提升 dirty
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
func (m *Map) dirtyLocked() {
// 对应情况 1
if m.dirty != nil { return }
// 情况 2
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// 没有被删除,复制到 dirty 中
if !e.tryExpungeLocked() { m.dirty[k] = e }
}
}

type readOnly struct {
// m 和 dirty 中的 value 是同一块内存
m map[interface{}]*entry
// 如果 dirty 和 read 中的数据不一致时,amended 为 true。若此时 read 中读不到,就要去 dirty 中查询
amended bool
}

// read 和 dirty 中的 entry 指向的是同一地址,修改会同时生效;减少空间浪费
type entry struct {
p unsafe.Pointer // *interface{}
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged { // 由于 Map 的延时删除策略,出现这两种情况均代表 kv 已经不存在了
return nil, false
}
return *(*interface{})(p), true
}
func (e *entry) delete() (hadValue bool) {
// 这里不是真的删除嗷,只是将对应 value 的 p 指向了 nil。直到下一次 dirty 升级时才会真正的删除,aka Map 的延时删除策略
for { p := atomic.LoadPointer(&e.p)
// 不存在或被删除
if p == nil || p == expunged { return false }
// CAS 将 enter.p 指向 nil
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p) // 被删除
if p == expunged { return false } // 比较 e.p 与 p, 相等赋新值,否则自旋比较
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true }
}
}
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// 用来判断 `entry` 是否被删除,当 `entry.p == nil` 时,说明这个 value 被标记为删除,这时会把它重新标记为 `expunged` 返回 true, 否则返回 false
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}

sync.Once

用于执行一次性操作,通常用于初始化只需执行一次的任务。它的作用是确保某个操作只会执行一次,无论是在单线程环境还是多线程环境下都可以保证。
比较值得学习的是使用到的快慢路径这个编程范式。

慢路径(Slow Path)

慢路径(Slow Path)指一种更加保守、安全但性能较低的解决方案。代码会使用互斥锁等同步原语来确保并发安全性。慢路径会导致性能开销增加,因为它需要在多个线程之间进行显式的同步和互斥操作,以确保数据的一致性和正确性。

快路径(Fast Path)

快路径(Fast Path)指一种更加高效但风险较高的解决方案。代码会使用原子操作等非阻塞的同步机制来尽量减少同步开销。快路径会更高效,因为它避免了显式的同步和互斥操作,但在某些情况下会导致竞态条件或数据不一致的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Once struct { 
done uint32
m    Mutex
}

func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

Slice

比较常见的问题是 slice 和普通 array 的区别:array 是值类型;slice 是引用类型,指向底层的数组地址
子切片:引用的底层数组是一致的,但是 slice 本身对应的内存地址不一致;当子切片长度超过原切片的时候,会产生扩容,替换掉原来的底层数组。
有一个特性是,如果使用 append 语句一次性加入多个元素,且新增元素数量超过了原始 slice 的容量,那么 golang 会自动根据总元素数量来申请新的容量进行扩容。其中还涉及到了申请时内存对齐的问题,可能申请到的容量会稍大于元素数量

1
2
3
4
5
var s []int // nil 未分配内存

s1 := []int{} // 声明 + 初始化 一个长度为 0 的 slice

s2 := []int{1,2,3} // 声明 + 初始化 一个长度为 3 的 slice

Channel

Channel是异步进行的, channel存在3种状态:

  • nil,未初始化的状态,只进行了声明,或者手动赋值为nil
  • active,正常的channel,可读或者可写
  • closed,已关闭,千万不要误认为关闭channel后,channel的值是nil
操作 一个零值nil通道 一个非零值但已关闭的通道 一个非零值且尚未关闭的通道
关闭 产生恐慌 产生恐慌 成功关闭
发送数据 永久阻塞 产生恐慌 阻塞或者成功发送
接收数据 永久阻塞 永不阻塞 阻塞或者成功接收
  1. Channel本质上是由三个FIFO(First In FirstOut,先进先出)队列组成的用于协程之间传输数据的协程安全的通道;FIFO的设计是为了保障公平,让事情变得简单,原则是让等待时间最长的协程最有资格先从channel发送或接收数据;
  2. 三个FIFO队列依次是buf循环队列,sendq待发送者队列,recvq待接收者队列。buf循环队列是大小固定的用来存放channel接收的数据的队列;sendq待发送者队列,用来存放等待发送数据到channel的goroutine的双向链表,recvq待接收者队列,用来存放等待从channel读取数据的goroutine的双向链表;sendq和recvq可以认为不限大小;
  3. 跟函数调用传参本质都是传值一样,channel传递数据的本质就是值拷贝,引用类型数据的传递也是地址拷贝;有从缓冲区buf地址拷贝数据到接收者receiver栈内存地址,也有从发送者sender栈内存地址拷贝数据到缓冲区buf;
  4. Channel里面参数的修改不是并发安全的,包括对三个队列及其他参数的访问,因此需要加锁,本质上,channel就是一个有锁队列;
  5. Channel 的性能跟 sync.Mutex 差不多,没有谁比谁强。Go官方之所以推荐使用Channel进行并发协程的数据交互,是因为channel的设计理念能让程序变得简单,在大型程序、高并发复杂的运行状况中也是如此。

关注下方代码 makechanswitch-case 相关语句,可以发现当存储在 buf 中的元素不包含指针时,hchan 中也不包含 GC 关心的指针。buf 指向一段相同元素类型的内存,elemtype 固定不变。受到垃圾回收器的限制,指针类型的缓冲 buf 需要单独分配内存。
channel 本身是引用类型,其创建全部调用的是 mallocgc(),在上开辟的内存空间,说明 channel 本身会被 GC 自动回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type hchan struct { 
qcount uint // 队列中所有数据总数
dataqsiz uint // 循环队列大小
buf unsafe.Pointer // 指向循环队列的指针
elemsize uint16 // 循环队列中元素的大小
closed uint32 // chan是否关闭的标识
elemtype *_type // 循环队列中元素的类型
sendx uint // 已发送元素在循环队列中的位置
recvx uint // 已接收元素在循环队列中的位置
recvq waitq // 等待接收的goroutine的等待队列
sendq waitq // 等待发送的goroutine的等待队列
lock mutex // 控制chan并发访问的互斥锁
}

func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 检查数据项大小不能超过 64KB
if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } // 检查内存对齐是否正确
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 缓冲区大小检查,判断是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
// 当队列或者元素大小为 0 时,调用 mallocgc() 在堆上为 channel 开辟一段大小为 hchanSize 的内存空间;
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 竞态检查,利用这个地址进行同步操作.
c.buf = c.raceaddr()
// 当元素类型不是指针类型时,调用 mallocgc() 在堆上为 channel 和底层 buf 缓冲区数组开辟一段大小为 hchanSize + mem 连续的内存空间;
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 表示hchan后面在内存里紧跟着就是buf环形队列
c.buf = add(unsafe.Pointer(c), hchanSize)
// 默认情况元素类型中有指针类型,调用 mallocgc() 在堆上分别为 channel 和 buf 缓冲区分配内存。
default:
c = new(hchan) c.buf = mallocgc(mem, elem, true)
}
// 设置元素个数、元素类型给创建的chan
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}

发送数据

  1. 首先 select 非阻塞的发送,需要判断两种情况;
  2. 一般的阻塞调用,先判断 recvq 等待接收队列是否为空,如果不为空,那么说明缓冲区为空 or 无缓冲 Channel;
  3. 如果 recvq 有接收者,则属于缓冲区空,从 recvq 中取出一个 goroutine,然后写入数据,接着唤醒 goroutine,结束发送过程;
  4. 如果缓冲区有空位,写入数据到缓冲区,完成发送;
  5. 如果缓冲区满,将发送数据的 goroutine 放到 sendq 中,进入睡眠,等待唤醒。

    接收数据

  6. 也是先判断select这种非阻塞接收的两种情况(block为false);然后是加锁进行阻塞调用的逻辑;
  7. 同步接收:如果发送者队列 sendq 不为空,且没有缓冲区,直接从 sendq 中取出一个 goroutine,读取消息,唤醒该 goroutine,结束读取;
  8. 同步接收:如果发送者队列 sendq 不为空,有缓冲区,说明缓冲区已经满了,移动 recvx 指针的位置,取出一个数据,同时从 sendq 中取出一个 goroutine,拷贝里面的数据到 buf 中,结束读取;
  9. 异步接收:如果发送者队列 sendq 为空,且缓冲区有数据(有缓冲 channel),直接从缓冲区取出数据,结束读取;
  10. 阻塞接收:如果发送者队列 sendq 为空,且缓冲区无数据(无缓冲 channel),就将当前的 goroutine 加入 recvq,进入睡眠等待唤醒。

关闭 Chan

  1. 判断 channel 当前状态,如果 channel == nil || channel 已经关闭,会直接 panic;
  2. 关闭的主要工作是释放所有的接收者和发送者:先回收接收者,因为从一个关闭的 channel 中读数据,不会发生 panic,顶多读到一个默认零值。再回收发送者。注意这里可能会产生 panic,因为往一个关闭的 channel 中发送数据,会产生 panic。

生产者-消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var wg sync.WaitGroup 
func producer(data chan<- int) {
for i := 0; i < 4; i++ {
data <- i
}
// 这里记得要关闭channel,不然会发生阻塞,因为消费者的数量没有限制,
// 当消费者从空的channel取值的时候会阻塞
close(data)
}

func consumer(data <-chan int) {
defer wg.Done()
for {
v, ok := <-data
if !ok {
break
}
fmt.Println("---:", v, " ===:", ok)
}
}

func main() {
data := make(chan int)
go producer(data)
wg.Add(1)
go consumer(data)
wg.Wait()
}
// Output // ---: 0 ===: true // ---: 1 ===: true // ---: 2 ===: true // ---: 3 ===: true

发布订阅模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type Broker struct { 
consumers []*Consumer
}
type Consumer struct {
ch chan string
}

func (b *Broker) produce(msg string) {
// 轮询给消费者发送消息
for _, v := range b.consumers {
v.ch <- msg
}
}
func (b *Broker) subscribe(consumer *Consumer) {
b.consumers = append(b.consumers, consumer)
}

func TestMq1(t *testing.T) {
// 初始化一个Broker节点
b := &Broker{
consumers: make([]*Consumer, 0, 4),
}
// 创建2个消费者
consumer1 := &Consumer{ ch: make(chan string, 1), }
consumer2 := &Consumer{ ch: make(chan string, 1), }
// 这2个消费者订阅Broker
b.subscribe(consumer1)
b.subscribe(consumer2)
// 生产者发送一个消息
b.produce("一条消息")
// 2个消费者拿到了刚才生产者发送的消息
fmt.Println(<-consumer1.ch)
fmt.Println(<-consumer2.ch)
// Output
// 一条消息
// 一条消息
}

循环打印abc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
var count = 5  

func main() {
wg := sync.WaitGroup{}
chanA := make(chan struct{}, 1)
chanB := make(chan struct{}, 2)
chanC := make(chan struct{}, 3)
chanA <- struct{}{}
wg.Add(3)
go printA(&wg, chanA, chanB)
go printB(&wg, chanB, chanC)
go printC(&wg, chanC, chanA)
wg.Wait()
}

func printA(wg *sync.WaitGroup, chanA chan struct{}, chanB chan struct{}) {
defer wg.Done()
for i := 0; i < count; i++ {
<-chanA
println("A")
chanB <- struct{}{}
}
}
func printB(wg *sync.WaitGroup, chanB chan struct{}, chanC chan struct{}) {
defer wg.Done()
for i := 0; i < count; i++ {
<-chanB
println("B")
chanC <- struct{}{}
}
}
func printC(wg *sync.WaitGroup, chanC chan struct{}, chanA chan struct{}) {
defer wg.Done()
for i := 0; i < count; i++ {
<-chanC
println("C")
chanA <- struct{}{}
}
}

另一种循环打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 当前的写法是可运行的,注意 可能会造成死锁,main 协程无法退出
func main() {
c1 := make(chan int)
c2 := make(chan int)
c3 := make(chan int)

stop := make(chan int)
// c1 <- 1 // 不可以放到这里,会造成死锁,目前我还没明白原理

go print1(c1, c2, stop)
go print2(c2, c3, stop)
go print3(c3, c1, stop)
c1 <- 1 //
time.Sleep(time.Millisecond)
// go func() { stop <- 1 }()
stop <- 1
return
}

func print1(c chan int, c2 chan int, stop chan int) {
for {
select {
case <-stop:
return
case <-c:
fmt.Println("1")
c2 <- 1
}
}
}

func print2(c chan int, c2 chan int, stop chan int) {
for {
select {
case <-stop:
return
case <-c:
fmt.Println("2")
c2 <- 1
}
}
}

func print3(c chan int, c2 chan int, stop chan int) {
for {
select {
case <-stop:
return
case <-c:
fmt.Println("3")
c2 <- 1
}
}
}

常见用法

定时任务

1
2
3
4
5
select { 
case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}

控制并发数

1
2
3
4
5
6
7
8
9
10
11
12
var limit = make(chan int, 3) 
func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}

遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
for i:= range ch
// 这里需要注意的是 会一直尝试从 ch 中取数据,直到 ch 被关闭位置,期间状态为阻塞
// 可以结合以下代码 理解
ch := make(chan int, 5)

wg := sync.WaitGroup{}
go func() {
for i := 0; i < 10; i++ {
go func(i int) {
wg.Add(1)
if i > 6 {
time.Sleep(time.Second)
}
ch <- i
defer wg.Done()
}(i)
}

time.Sleep(time.Second * 8)

wg.Wait()
close(ch) // 如果不关闭,下方会一直阻塞,造成死锁
}()

for i := range ch {
fmt.Printf("printing: %d at %d \n", i, time.Now().Unix())
}

Golang atomic

常规总结

1、  原子指针函数和类型+方法的两种方式都可以执行原子操作,我更建议用类型+方法的方式,因为更简单更清晰。
2、  原子操作比互斥锁更轻便,但使用也是有限制的,原子操作仅提供有限的数据类型,使用时要珍酌清楚。
3、  atomic.Value 原子值中存储引用类型时,使用一定要注意否则会有安全性问题哈,最好别存引用类型。
4、  atomic.Value 不要存 nil,后续添加的类型一定要是第一次添加的类型,否则会 Panic。
5、  不要对外暴露原子变量(原子变量控制在包内访问)、不要将原子值及其指针值通过参数、通道等传递。

原生 Map

实际使用中需要注意以下几点:

  1. 并发读写不安全
  2. 迭代中修改 map 会造成迭代器失效和不确定的结果
  3. 对 map 的遍历认为是无序的
  4. 对 nil 的 map 操作不当会造成 panic
  5. 内存泄漏:向 map 中添加键值对会增加底层哈希表的大小。对于不用的或者已删除的键值对,推荐使用 delete() 删除,释放内存。

Golang 中 map 是一个指针,占用 8 个字节。当使用 make 创建 map 时,底层调用的是 makemap() 函数,makemap() 函数返回的是一个指针,因为返回的是指针,所以 map 作为参数的时候,函数内部能修改map。
golang 中 map 底层使用的是哈希查找表,用链表来解决哈希冲突。每个 map 的底层结构是 hmap,是由若干个结构为 bmap 的 bucket 组成的数组,每个 bucket 底层都采用链表结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type hmap struct {
count int // map中元素的数量,调用len()直接返回此值
flags uint8 // 状态标识符,key和value是否包指针、是否正在扩容、是否已经被迭代
B uint8 // map中桶数组的数量,桶数组的长度的对数,len(buckets) == 2^B,可以最多容纳 6.5 * 2 ^ B 个元素,6.5为装载因子
noverflow uint16 // 溢出桶的大概数量,当B小于16时是准确值,大于等于16时是大概的值
hash0 uint32 // 哈希种子,用于计算哈希值,为哈希函数的结果引入一定的随机性
buckets unsafe.Pointer // 指向桶数组的指针,长度为 2^B ,如果元素个数为0,就为 nil
oldbuckets unsafe.Pointer // 指向一个旧桶数组,用于扩容,它的长度是当前桶数组的一半
nevacuate uintptr // 搬迁进度,小于此地址的桶数组迁移完成
extra *mapextra // 可选字段,用于gc,指向所有的溢出桶,避免gc时扫描整个map,仅扫描所有溢出桶就足够了
}

// 溢出桶结构
type mapextra struct {
overflow *[]*bmap // 指针数组,指向所有溢出桶
oldoverflow *[]*bmap // 指针数组,发生扩容时,指向所有旧的溢出桶
nextOverflow *bmap // 指向所有溢出桶中下一个可以使用的溢出桶
}

bmap的结构:

1
2
3
4
5
6
7
8
9
10
11
12
type bmap struct {
tophash [bucketCnt]uint8 // bucketCnt=8,// 存放key哈希值的高8位,用于决定kv键值对放在桶内的哪个位置
}

//实际上编辑期间会动态生成一个新的结构体
type bmap struct {
topbits [8]uint8 // 存放key哈希值的高8位,用于决定kv键值对放在桶内的哪个位置
keys [8]keytype // 存放key的数组
values [8]valuetype // 存放value的数组
pad uintptr // 用于对齐内存
overflow uintptr // 指向下一个桶,即溢出桶,拉链法
}

buckets是一个bmap数组,数组的长度就是 2^B。每个bucket固定包含8个key和value,实现上面是一个固定的大小连续内存块,分成四部分:tophash 值,8个key值,8个value值,指向下个bucket的指针。
tophash 值用于快速查找key是否在该bucket中,当插入和查询运行时都会使用哈希哈数对key做哈希运算,获取一个hashcode,取高8位存放在bmap tophash字段中。
桶里面会最多装 8 个 key,这些 key 之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果是“一类”的。在桶内,又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个位置(一个桶内最多有8个位置)
桶结构的很多字段得在编译时才会动态生成,比如key和values等
桶结构中,之所以所有的key放一起,所有的value放一起,而不是key/value一对对的一起存放,目的便是在某些情况下可以省去pad字段,节省内存空间。由于内存对齐的原因,key0/value0/key1/value1… 这样的形式可能需要更多的补齐空间,比如 map[int64]int8 ,1字节的value后面需要补齐7个字节才能保证下一个key是 int64 对齐的。
golang中的map使用的内存是不会收缩的,只会越用越多。

扩容

  1. 触发 map 扩容的时机(插入、删除key)
    • 当装载因子超过6.5时,扩容一倍,属于增量扩容;
    • 当使用的溢出桶过多时,重新分配一样大的内存空间,属于等量扩容;(实际上没有扩容,主要是为了回收空闲的溢出桶,节省空间,提高 map 的查找和插入效率)
    1. 为什么会出现这种情况?
      这种情况可能是因为 map 删除的特性导致的。当我们不断向哈希表中插入数据,并且将他们又全部删除时,其内存占用并不会减少,因为删除只是将桶对应位置的 tophash 置 nil 而已。
      这种情况下,就会不断的积累溢出桶造成内存泄露,为了解决这种情况,采用了等量扩容的机制,一旦哈希表中出现了过多的溢出桶,会创建新桶保存数据,gc 会清理掉老的溢出桶,从而避免内存泄露。
    2. 如何定义溢出桶是否太多需要等量扩容呢?两种情况:
      • 当 B 小于 15时,溢出桶的数量超过 2^B,属于溢出桶数量太多,需要等量扩容;
      • 当 B 大于等于 15 时,溢出桶数量超过 2^15,属于溢出桶数量太多,需要等量扩容。
  2. 扩容策略(怎么扩容?)
      Go 会创建一个新的 buckets 数组,新的 buckets 数组的容量是旧buckets数组的两倍(或者和旧桶容量相同),将原始桶数组中的所有元素重新散列到新的桶数组中。这样做的目的是为了使每个桶中的元素数量尽可能平均分布,以提高查询效率。
      旧的 buckets 数组不会被直接删除,而是会把原来对旧数组的引用去掉,让 GC 来清除内存。
      在 map 进行扩容迁移的期间,不会触发第二次扩容。只有在前一个扩容迁移工作完成后,map 才能进行下一次扩容操作。
  3. 搬迁策略
      由于 map 扩容需要将原有的 kv 键值对搬迁到新的内存地址,如果一下子全部搬完,会非常的影响性能。go 中 map 的扩容采用渐进式的搬迁策略,原有的 key 并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket,将搬迁的O(N)开销均摊到O(1)的赋值和删除操作上。
      hashGrow() 只是分配了新的 buckets,并将老 buckets 挂在到 oldbuckets 字段上。
      而且实际的搬迁并不在 hashGrow() 而是在 growWork() 函数中,而调用 growWork() 函数的动作是在 mapassign 和 mapdelete 函数中,所以真正的‘搬迁’操作被离散在了 map 插入修改、删除 key 的动作中。会尝试检查 oldbucket,有待执行的数据,则‘搬迁’。

Sync.Mutex

正常模式

当一个 goroutine 持有锁时,后续的 goroutine 会以先进先出的方式排队等待,当锁被释放时,队列中第一个 goroutine 会被唤醒。但是它需要和新到来的 goroutine 争夺锁,由于新来的 goroutine 已经在 CPU 中,刚刚唤醒的大概率会竞争失败,重新被放到队首。这种情况有可能导致 goroutine 迟迟不能被执行而被“饿死”

饥饿模式

为了解决“饿死”的问题,如果一个等待的 goroutine 超过 1 ms (starvationThresholdNs) 没有得到锁,这个锁就会被转换为饥饿模式。饥饿模式下,锁会直接交给队列中的第一个 goroutine,而新来的 goroutine 会放到队尾等待。正常状态下的性能是高于饥饿模式的,所以在大部分情况下,还是应该回到正常模式去的。
当队列中最后一个 goroutine 被执行或者它的等待时间低于 1 ms 时,会将该锁的状态切换回正常模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Mutex struct { 
// +---------------------------------+-----------+
// | WaitersCount | status |
// +---------------------------------+-----------+
// <-----------+ 29 +---------------> <--+ 3 +--->
state int32 // 锁状态,其中低三位用来表示锁状态,高 29 位用来记录等待当前互斥锁的 goroutine 个数
sema uint32 // 控制锁状态的信号量
}
const (
mutexLocked = 1 << iota // 0001 表示互斥锁处于锁定状态
mutexWoken // 0010 表示从正常模式被唤醒
mutexStarving // 0100 饥饿模式
mutexWaiterShift = iota // 3 表示除 WaitersCount 外,状态占用了三个 bite
starvationThresholdNs = 1e6 // 饥饿的阈值, 1ms
)

Lock

通过 CAS 判断 m.state == 0 时,意味着当前锁处于正常的解锁状态,只需要将锁设置为 mutexLocked 即可,否则就需要进入 lockSlow 通过自旋等方式等待锁释放。lockslow 大致分为以下几个部分:

  1. 判断是否可以自旋:自旋需要满足两个条件:
    • 处于正常模式,且锁已经被锁定
    • runtime_canSpin 返回 true:
      • 运行在多 CPU 的机器上;
      • 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
      • 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
  2. 一旦进入自旋,会通过 runtime_doSpin ,去执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间,一旦不满足上面的两个条件了,就会去计算当前锁的最新状态,导致其不满足的原因有很多,如:
    • 其他 goroutine 已经释放锁
    • 其他 goroutine 导致该锁进入饥饿模式
    • 自旋次数超过 4 次
  3. 计算和更新状态其实就是去更新 state 中的四个值;一旦计算完毕后,通过 CAS 尝试更新
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func (m *Mutex) Lock() { 
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    if race.Enabled {
    race.Acquire(unsafe.Pointer(m))
    }
    return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
    }

Unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (m *Mutex) Unlock() { 
if race.Enabled {
_ = m.state race.Release(unsafe.Pointer(m))
}
// 如果 m.state - mutexLocked == 0 说明没人等待该锁,同时该锁处于正常状态
// 这时可以快速解锁,即锁状态会直接赋成 0
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 否则则需要慢速解锁
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 如果锁没锁定,直接抛出异常
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式下
if new&mutexStarving == 0 {
old := new
for {
// 如果没有其他等待者或者锁不处于空闲状态,直接返回,不需要唤醒其他等待着
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒新的等待者
// 等待者减一,设置唤醒标志 woken
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 设置 state, 唤醒一个阻塞着的 goroutine
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
// 设置失败,重新获取状态设置
old = m.state
}
} else {
// 饥饿模式下,直接唤醒队首的 goroutine,这时 mutexLocked 位依然是 0
// 但由于处在饥饿状态下,锁不会被其他新来的 goroutine 抢占
runtime_Semrelease(&m.sema, true, 1)
}
}

Context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  
// Context 提供跨越API的截止时间获取,取消信号,以及请求范围值的功能。
// 它的这些方案在多个 goroutine 中使用是安全的
type Context interface {     
// 如果设置了截止时间,这个方法ok会是true,并返回设置的截止时间  
Deadline() (deadline time.Time, ok bool)     
// 如果 Context 超时或者主动取消返回一个关闭的channel,如果返回的是nil,表示这个     
// context 永远不会关闭,比如:Background()  
Done() <-chan struct{}     
// 返回发生的错误  
Err() error     
// 它的作用就是传值  
Value(key interface{}) interface{}
}

// 常用方法
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Goroutine canceled")
return
default:
fmt.Println("Working...")
time.Sleep(1 * time.Second)
}
}
}(ctx)
time.Sleep(3 * time.Second)
cancel()

ctx := context.Background()
fmt.Printf("ctx: %p \n", &ctx) // ctx: 0xc0000103f0
ctx1 := ctx // 深拷贝
fmt.Printf("ctx1: %p \n", &ctx1) // ctx1: 0xc000010400

拷贝

1、浅拷贝

浅拷贝是指对地址的拷贝
浅拷贝的是数据地址,只复制指向的对象的指针,此时新对象和老对象指向的内存地址是一样的,新对象值修改时老对象也会变化,释放内存地址时,同时释放内存地址
引用类型的都是浅拷贝:slicemapfunction
浅拷贝的特点:

  • 拷贝的时候仅仅拷贝地址,地址指向的都是同一个值
  • a中修改,则b中也跟着变化
  • 内存销毁是一致的

2、深拷贝

深拷贝是指将地址指向的值进行拷贝
深拷贝的是数据本身,创造一个一样的新对象,新创建的对象与原对象不共享内存,新创建的对象在内存中开辟一个新的内存地址,新对象值修改时不会影响原对象值。既然内存地址不同,释放内存地址时,可分别释放
值类似的都是深拷贝:intfloatboolarraystruct
深拷贝的特点:

  • 复制的时候会新创建一个对象
  • 指向完全不同的内存地址
  • 修改是互不影响的
    通过指针求值,将值拷贝实现,修改拷贝的值不影响原来的值

    3、结构体的深拷贝

    默认情况下,结构体类型中的字段是值类型,拷贝时都是深拷贝
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    type Per struct { 
    Name string
    Age int
    HouseIds [2]int
    }

    func main() {
    p1 := Per{ Name: "ssgeek", Age: 24, HouseIds: [2]int{22, 33}, }
    p2 := p1
    fmt.Printf("%v %p \n", p1, &p1) // {ssgeek 24 [22 33]} 0xc000180030
    fmt.Printf("%v %p \n", p2, &p2) // {ssgeek 24 [22 33]} 0xc000180060
    p2.Age = 19
    p2.Name = "likui"
    p2.HouseIds[1] = 44
    fmt.Printf("%v %p \n", p1, &p1) // {ssgeek 24 [22 33]} 0xc000098180
    fmt.Printf("%v %p \n", p2, &p2) // {likui 19 [22 44]} 0xc0000981b0
    }

    4、结构体的浅拷贝

    使用指针进行浅拷贝,浅拷贝中,可以看到p1p2的内存地址是相同的,修改其中一个对象的属性时,另一个也会产生变化
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package main 
    import "fmt"
    type Per struct {
    Name string
    Age int
    HouseIds [2]int
    }

    func main() {
    p1 := Per{ Name: "ssgeek", Age: 24, HouseIds: [2]int{22, 33}, }
    p2 := &p1
    fmt.Printf("%v %p \n", p1, &p1) // {ssgeek 24 [22 33]} 0xc000076180
    fmt.Printf("%v %p \n", p2, p2) // &{ssgeek 24 [22 33]} 0xc000076180
    p2.Age = 19
    p2.Name = "likui"
    p2.HouseIds[1] = 44
    fmt.Printf("%v %p \n", p1, &p1) // {likui 19 [22 44]} 0xc000076180
    fmt.Printf("%v %p \n", p2, p2) // &{likui 19 [22 44]} 0xc000076180
    }

5、结构体值类型的浅拷贝

使用new函数实现值类型的浅拷贝
值类型的默认是深拷贝,想要实现值类型的浅拷贝,一般是两种方法

  • 使用指针
  • 使用new函数(new函数返回的是指针)

6、结构体引用类型的浅拷贝

结构体默认是深拷贝,但如果结构体中包含map、slice等这些引用类型,默认也还是浅拷贝
map是引用类型,引用类型浅拷贝是默认的情况

7、结构体引用类型的深拷贝

结构体中含有引用类型的字段,那么这个字段就是浅拷贝,但是往往希望的是深拷贝,解决方案如下

  • 方法一:挨个把可导致浅拷贝的引用类型字段自行赋值。赋值后,修改值就相互不影响了
  • 方法二:使用json或反射。简单来说:json将引用类型的数据进行dumpdump后就和原来的引用类型没有关系了

GMP

Go 协程为什么快?

这就要从进程,线程,协程三者的关系讲起。
最早的进程时代,是单 CPU 单进程,一切皆是串行执行;到了多进程/线程时代,当一个进程阻塞时,切换到另外等候的进程,时间片轮转法保证了等待的进程都能够被运行,但是进程间的调度会占用CPU大部分时间;而到了高并发的场景下,为每个任务都去创建一个线程显然是不合理的,那么,golang 将原来的线程分为了两部分,一个是用户级别的线程(轻量级的线程,运行在用户态,即 goroutine),一个是内核级的线程(即一般意义上的线程,运行在内核态),这样,对于协程的切换和调度都是在用户态进行,不涉及内核级别的调度,因此这是 goroutine 能够很好地支持高并发的场景的基础。

  • 进程: 进程是具有一定独立功能的程序,进程是系统资源分配和调度的最小单位。每个进程都有自己的独立内存空间,不同进程通过进程间通信来通信。由于进程比较重量,占据独立的内存,所以上下文进程间的切换开销(栈、寄存器、虚拟内存、文件句柄等)比较大,但相对比较稳定安全。
  • 线程: 线程是进程的一个实体,线程是内核态,而且是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程间通信主要通过共享内存,上下文切换很快,资源开销较少,但相比进程不够稳定容易丢失数据。独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。创建一个 goroutine 的栈内存消耗为 2-4 KB,实际运行过程中,如果栈空间不够用,会自动进行扩容。创建一个 thread 则需要消耗 1 MB 栈内存,而且还需要一个被称为 “a guard page” 的区域用于和其他 thread 的栈空间进行隔离。
  • 协程: 协程是一种用户态的轻量级线程,协程的调度完全是由用户来控制的。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。

线程的时间成本主要来自于切换线程上下文时,用户态与内核态的切换、线程的调度、寄存器变量以及状态信息的存储。

Goroutine采用的是半抢占式的协作调度,只有在当前Goroutine发生阻塞时才会导致调度;同时发生在用户态,调度器会根据具体函数只保存必要的寄存器,切换的代价要比系统线程低得多。运行时有一个runtime.GOMAXPROCS变量,用于控制当前运行正常非阻塞Goroutine的系统线程数目。
goroutine 只是由官方实现的超级”线程池”。
goroutine 的栈空间是从堆内存中分配的。

每个实例2KB (在1.4新版本发布的运行时信息当中明确指出,从以前的1.2版本到1.3版本协程占用大小4kb到8kb,到现在的2kb左右,是一个性能上和的大跃进。)的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是go高并发的根本原因。

调度器的三个基本对象:

  • G(Goroutine),代表协程,go关键字创建的对象
  • M(Work Thread),工作线程,一个 M 关联一个内核级线程
  • P(Processor),代表一个 处理器,又称上下文

    G-M-P三者的关系与特点:

  • 每一个运行的 M 必须绑定一个 P,线程 M 创建后会检查并执行G(goroutine)对象。
  • 每一个 P 保存着一个协程 G 的队列。
  • 除了每个 P 自身保存的G的队列外,调度器还有一个全局的G队列
  • M 从队列中提取 G,并执行
  • P 的个数就是 GOMAXPROCS(最大256),启动时固定,一般不修改
  • M 的个数和 P 的个数不一定一样多(会有sleep的M或者P不绑定M)(最大10000)
  • P 是用一个全局数组(255)来保存的,并维护一个全局的 P 的空闲链表

局部G队列与全局G队列的关系

  • 全局G任务队列会和本地G任务队列按照一定策略互相交换。
  • G的执行顺序,先从本地队列找,本地没有则从全局队列找
  • 转移:局部与全局,全局G个数/P个数   局部与局部,一次性转移一半

Gorutine 从入队到执行

  1. 当我们创建一个G对象,就是 gorutine,它会加入到本地队列或者全局队列
  2. 如果还有空闲的P,则创建一个M 绑定该 P ,注意!这里,P 此前必须还没绑定过M 的,否则不满足空闲的条件
    1. 先找到一个空闲的P,如果没有则直接返回
    2. P 个数不会占用超过自己设定的cpu个数
    3. P 在被 M 绑定后,就会初始化自己的 G 队列,此时是一个空队列
  3. M 会启动一个底层线程,循环执行能找到的 G 任务。这里的寻找的 G 从下面几方面找:
    1. 当前 M 所绑的 P 队列中找
    2. 去别的 P 的队列中找
    3. 去全局 G 队列中找
  4. G任务的执行顺序是,先从本地队列找,本地没有则从全局队列找
  5. 程序启动的时候,首先跑的是主线程,然后这个主线程会绑定第一个 P
  6. 入口 main 函数,其实是作为一个 goroutine 来执行

Gorutine 从入队到执行详细流程

  1. runtime创建第一个线程M0:M0是启动进程后的编号为0的主线程,这个M对应的实例会在全局变量runtime.m0中,不需要在heap上分配,M0负责执行初始化操作和启动第一个G,在之后M0就和其他的M一样了。
  2. runtime创建第一个Go协程G0:G0是每次启动一个M都会第一个创建的 goroutine,G0仅用于负责调度G,G0不指向任何可执行函数,每个M都会有一个自己的G0。在调度或系统调用时会使用G0的栈空间,全局变量的G0是M0的G0。一般的G0放在本地队列中。
  3. 关联M0和G0。
  4. 调度初始化初始化M0、栈、垃圾回收,以及创建和初始化由 GOMAXPROCS 个P构成的P列表。
  5. 创建 main() 中的 goroutine,即 runtime.main 创建 goroutine。
  6. 启动M0,此时M0已经绑定了P,从P的本地队列中获取G,获取到 main goroutine。
  7. M绑定P。
  8. 循环判断M通过P是否能够获取到G。
  9. 获取不到则M进入休眠队列,等待被唤醒后再重新与P绑定。
  10. 能够获取到G,则M根据G中的栈信息和调度信息设置运行环境。
  11. M执行G。
  12. G退出,runtime.main 执行 Defer 和 Panic 处理,或调用 runtime.exit 退出程序。

中断挂起与恢复

goroutine协程的中断挂起与恢复 :协程的切换时间片是10ms,也就是说 goroutine 最多执行10ms就会被 M 切换到下一个 G。这个过程,又被称为 中断,挂起
go程序启动时会首先创建一个特殊的内核线程 sysmon,用来监控和管理,其内部是一个循环:

  1. 记录所有 P 的 G 任务的计数 schedtick,schedtick 会在每执行一个G任务后递增
  2. 如果检查到 schedtick 一直没有递增,说明这个 P 一直在执行同一个 G 任务,如果超过10ms,就在这个G任务的栈信息里面加一个 tag 标记
  3. 然后这个 G 任务在执行的时候,如果遇到非内联函数调用,就会检查一次这个标记,然后中断自己,把自己加到队列末尾,执行下一个G
  4. 如果没有遇到非内联函数 调用(有时候正常的小函数会被优化成内联函数)的话,那就会一直执行这个G任务,直到它自己结束;如果是个死循环,并且 GOMAXPROCS=1 的话。那么一直只会只有一个 P 与一个 M,且队列中的其他 G 不会被执行!

中断后的恢复

  1. 中断的时候将寄存器里的栈信息,保存到自己的 G 对象里面
  2. 当再次轮到自己执行时,将自己保存的栈信息复制到寄存器里面,这样就接着上次之后运行
    GOMAXPROCS–性能调优

看完上面的内容,相信你已经知道,GOMAXPROCS 就是 go 中 runtime 包的一个函数。它设置了 P 的最多的个数。这也就直接导致了 M 最多的个数是多少,而 M 的个数就决定了各个 G 队列能同时被多少个 M 线程来进行调取执行!

GC

最常见的垃圾回收算法有标记清除(Mark-Sweep) 和引用计数(Reference Count),Go 语言 采用的是标记清除算法。1.5 版本以后在此基础上使用了三色标记法和写屏障技术,提高了效率。

标记清除收集器是跟踪式垃圾收集器,其执行过程可以分成标记(Mark)和清除(Sweep)两个阶段:

  • 标记阶段 — 从根对象出发查找并标记堆中所有存活的对象;
  • 清除阶段 — 遍历堆中的全部对象,回收未被标记的垃圾对象并将回收的内存加入空闲链表。

标记清除算法的一大问题是在标记期间,需要暂停程序(Stop the world,STW,这也是 GC 算法优化的重点),标记结束之后,用户程序才可以继续执行。为了能够异步执行,减少 STW 的时间,Go 语言采用了三色标记法。

三色标记算法将程序中的对象分成白色、黑色和灰色三类。

  • 白色:不确定对象。
  • 灰色:存活对象,子对象待处理。
  • 黑色:存活对象。

标记开始时,所有内存加入白色集合(这一步需 STW )。首先将根对象标记为灰色,加入待扫描队列(灰色集合);使用并发的 goroutine 扫描队列,取出一个灰色对象,将其标记为黑色,并将其指向的对象标记为灰色,加入队列。重复这个过程,直到灰色集合为空为止,标记阶段结束。那么白色对象即可需要清理的对象,而黑色对象均为根可达的对象,不能被清理。

三色标记法因为多了一个白色的状态来存放不确定对象,所以后续的标记阶段可以并发地执行。当然并发执行的代价是可能会造成一些遗漏,因为那些早先被标记为黑色的对象可能目前已经是不可达的了。所以三色标记法是一个 false negative(假阴性)的算法。

三色标记法并发执行仍存在一个问题,即在 GC 过程中,对象指针发生了改变。比如下面的例子:

1
A (黑) -> B (灰) -> C (白) -> D (白)  

正常情况下,D 对象最终会被标记为黑色,不应被回收。但在标记和用户程序并发执行过程中,用户程序删除了 C 对 D 的引用,而 A 获得了 D 的引用。标记继续进行,D 就没有机会被标记为黑色了(A 已经处理过,这一轮不会再被处理)。

1
2
3
A (黑) -> B (灰) -> C (白)   

D (白)

为了解决这个问题,Go 使用了内存屏障技术,它是在用户程序读取对象、创建新对象以及更新对象指针时执行的一段代码,类似于一个钩子。垃圾收集器使用了写屏障(Write Barrier)技术,这个计数会拦截将白色指针插入黑色对象的操作,当对象新增或更新时,会将其着色为灰色。这样即使与用户程序并发执行,对象的引用发生改变时,垃圾收集器也能正确处理了。

写屏障破坏两个条件其一即可

  • 破坏条件1:Dijistra写屏障

满足强三色不变性:黑色节点不允许引用白色节点 当黑色节点新增了白色节点的引用时,将对应的白色节点改为灰色

  • 破坏条件2:Yuasa写屏障

满足弱三色不变性:黑色节点允许引用白色节点,但是该白色节点有其他灰色节点间接的引用(确保不会被遗漏) 当白色节点被删除了一个引用时,悲观地认为它一定会被一个黑色节点新增引用,所以将它置为灰色

一次完整的 GC 分为四个阶段:

  • 标记准备(Mark Setup,需 STW),打开写屏障(Write Barrier),开启辅助 GC (mutator assist),统计 root 对象的任务数量
  • 使用三色标记法标记(Marking, 并发)
  • 标记结束(Mark Termination,需 STW),关闭写屏障。
  • 清理(Sweeping, 并发)

GC 的触发时机

使用系统监控,该触发条件由 runtime.forcegcperiod 变量控制,默认为 2 分 钟。当超过两分钟没有产生任何 GC 时,强制触发 GC。 使用步调(Pacing)算法,其核心思想是控制内存增长的比例。如 Go 的 GC 是一种比例 GC, 下一次 GC 结束时的堆大小和上一次 GC 存活堆大小成比例。一般来说,当前申请的内存是上一次 GC 两倍时触发。

一些细碎知识点

Golang 传参

  1. 理论上来说,Golang 只有值传递,所有协程的入参都是值拷贝,协程内部的修改,无法影响原参数
  2. 对于指针类型的参数,是对其指向地址的拷贝,因此可以使用地址来修改原参数
  3. Slice 底层是引用数组,因此对于 Slice 的修改可以反映到原数组;注意 扩容相关内容

defer

  1. 在编译期间压入操作栈
  2. 多个 defer 的执行顺序为“后进先出”;
  3. 所有函数在执行 RET 返回指令之前,都会先检查是否存在 defer 语句,若存在则先逆序调用 defer 语句进行收尾工作再退出返回;
  4. 匿名返回值是在 return 执行时被声明,有名返回值则是在函数声明的同时被声明,因此在 defer 语句中只能访问有名返回值,而不能直接访问匿名返回值;
  5. return 其实应该包含前后两个步骤:第一步是给返回值赋值(若为有名返回值则直接赋值,若为匿名返回值则先声明再赋值);第二步是调用 RET 返回指令并传入返回值,而 RET 则会检查 defer 是否存在,若存在就先逆序插播 defer 语句,最后 RET 携带返回值退出函数;
  6. defer 声明时会先确定函数参数,推迟执行的仅仅是函数体

OOM

一般的解决办法是通过 pprof 进行内存分析,发现瓶颈

1
go tool pprof http://123.456.789.1:1234/debug/pprof/heap

interface 比较

Struct 能否比较

不同类型的 struct 无法进行比较,而同一个 struct 的两个实例需要判断其成员能否比较,如果是 slice、map、func 中的一种就无法比较。同时无法比较的类型无法作为 map 的键。

不能比较的类型如何比较是否相等

string,int,float,interface 等可以通过 reflect.DeepEqual 和 等于号进行比较
而 slice、struct、map 一般使用 reflect.DeepEqual 进行比较。

标准化输入

1
2
3
4
5
6
7
8
import (
"bufio"
"os"
)

reader := bufio.NewReader(os.Stdin)
input1, _ := reader.ReadString('\n') // 读到的是 string 类型的一行数据

使用 goroutine 有没有遇到什么问题?

  1. 内存泄漏:goroutine 没有适当的结束,被文件或链接句柄阻塞,有死锁或者 channel 阻塞,造成内存泄露
    1. 确保明确退出
    2. 使用 defer 结束句柄
    3. 使用 context 管理生命周期
  2. 并发访问数据,造成数据竞争。
    1. sync.mutex 或者 channel 来同步访问
  3. 死锁
    1. 多个 goroutine 互相等待对方的 channel 信道,造成死锁
    2. 主线程 channel 阻塞,造成死锁
  4. 过量创建 goroutine
    1. 使用 waitgroup 或者 channel 控制并发量
  5. panic 传播:goroutine 中的 panic 无法传播到主协程,可能导致程序停止工作
    1. 使用 defer recover 收集 panic