根据极客课堂 晁岳攀 老师的《Go 并发编程实战课》总结而来
锁是什么 :
在 *** 作系统层面, 多个线程在 *** 作一块共享内存的时候, 需要先获取原来的数据, 然后进行修改。
这一个过程存在两个步骤, 为了保证这两个 *** 作的原子性, 需要对这个 *** 作“上锁” 。
linux中的锁其实也是一个在共享内存中的变量, 所有线程在要执行 “修改共享内存“ 的代码(临界区)的时候,都需要对这段代码进行加锁。
相当于先获取一个全局变量,看他的值是0还是1, 如果是1表示这个 锁 已经被其他线程获取到了,自己需要等他变成0之后才能修改这个锁的值为1, 然后再去执行临界区的代码, 执行完之后再将这个全局变量改成 0 来释放锁。
但是我们本身去读取这个 “锁” 的时候也需要一个原子 *** 作, 这个需要cpu在硬件层面做支持(锁总线)。
CAS *** 作 : compare-and-swap 即 比较交换(如果当前值等于输入的值,则替换当前值)
go 语言中锁的实现 :
go中的锁大概分四个阶段
第一版锁 :
type Mutex struct {
key int32 // 状态标志, 标志锁是否被线程持有,大于等于 1,说明这个排外锁已经被持有
sema uint32 // 信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒
}
// CAS *** 作,当时还没有抽象出atomic包
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
// 互斥锁的结构,包含两个字段
type Mutex struct {
key int32 // 锁是否被持有的标识
sema int32 // 信号量专用,用以阻塞/唤醒goroutine
}
// 保证成功在val上增加delta的值
// 这里使用死循环的方式使用 cas 不断尝试修改一个值, 直到成功为止
func xadd(val *int32, delta int32) (new int32) {
for {
v := *val
if cas(val, v, v+delta) {
return v + delta
}
}
panic("unreached")
}
// 请求锁
func (m *Mutex) Lock() {
// 在死循环中获取锁。
// 注意这个上锁的过程不管你当前 key 的值是多少, 只关心我有没有给你 + 1成功!!!
// 所以, 这里上锁之后的最后值可能是1, 也可能不是1.
// 如果不是1, 则阻塞等待
if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
return
}
semacquire(&m.sema) // 否则阻塞等待
}
func (m *Mutex) Unlock() {
// 在死循环中释放锁
// 这里也是同样的原理, 如果释放锁后的结果不为0, 则唤醒其他阻塞的协程
if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
return
}
semrelease(&m.sema) // 唤醒其它阻塞的goroutine
}
在这套逻辑下, 如果有多个协程同时来上锁, 则只会有一个协程G1上锁后的 key 值为1, 然后G1开始执行自己的任务。
其他协程G2、G3拿到的值可能都大于1, 然后G2、G3都进入阻塞等待状态。
等G1执行完之后释放锁的时候, 如果此时还有其他的G想要请求锁并且被阻塞了, 那key就不等于1.
G1将key-1之后就唤醒一个被阻塞的G(可能是G2、G3等), 这个被唤醒的G2开始直接执行需要加锁的代码(默认G2已经获取到锁了)。
等G2执行完之后, 又将key-1. 如果G2发现key为0, 则表示没有其他的G被阻塞, 直接返回。 否则继续唤醒, 继续循环。
key 不仅仅标识了锁是否被 goroutine 所持有,还记录了当前持有和等待获取锁的 goroutine 的数量。
他这里主要的性能问题可能在协程的阻塞和唤醒上面。
还有一个就是不能确定释放锁的协程释放是持有锁的协程。
所以这里要注意避免误删或者漏掉锁而导致死锁的情况(可以考虑将要修改变量的代码放到一个方法中, 在修改前加锁,在defer中释放锁 )。
初版的 Mutex 实现有一个问题:请求锁的 goroutine 会排队等待获取互斥锁。
虽然这貌似很公平,但是从性能上来看,却不是最优的。因为如果我们能够把锁交给正在占用 CPU 时间片的 goroutine 的
第二版锁 :
type Mutex struct {
state int32
sema uint32
}
state 这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。
原来使用key, 如果key为0则表示锁未被持有, 否则表示锁被持有,切具体的值表示当前关注这把锁的协程数量。
现在将他改成了三个东西, 一个表示这个锁是否被持有, 这点其实从原来的0和非0可以看得出来,另外一个表示是否有唤醒的协程,最后一个表示当前等待此锁的G的数量。
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
func (m *Mutex) Lock() {
// Fast path: 幸运case,能够直接获取到锁
// 这里假设 state 为0,锁没被持有,修改他为被持有, 成功则直接退出
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 到了这里, 这把锁正在被其他人持有, 在旧版本中这里就直接进入阻塞状态了
// 唤醒标志
awoke := false
for {
// 获取旧状态
old := m.state
// 旧状态和 mutexLocked 进行按位或?
new := old | mutexLocked // 新状态加锁
// 这里用按位与运算, 如果第一位非0, 表示锁被持有
if old&mutexLocked != 0 {
// 使用位运算,旧状态 + 4(等待者数量+1)
new = old + 1<
这个加锁的核心思想是如果当前一开始就没有被其他G锁住, 而且自己的修改生效了, 则直接退出返回, 去执行临界区代码。
否则进入死循环:
1、重新获取状态值
2、对获取到的状态值加锁,生成新的状态值
3、如果获取到的状态值已经被加锁, 则状态值中的等待G的数量+1
4、如果当前是被唤醒的,清除新状态值中的唤醒标志
5、尝试修改状态值为新的状态值, 如果修改失败, 则重新循环
6、如果修改成功, 判断旧的状态值中是否被加锁
7、如果没被锁,则说明我这个线程抢锁成功, 而且我已经修改了锁里面的状态,直接退出循环, 去执行临界区的代码。
8、如果被锁, 说明抢锁失败, 则进入休眠状态,等待被唤醒。
9、被唤醒之后设置被唤醒标志位 true 。
10、然后重新进入循环,抢锁(获取旧的状态值,重复以上 *** 作)。
再看下释放锁:
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
panic("sync: unlock of unlocked mutex")
}
old := new
for {
// 如果没有被阻塞的G或者有唤醒的G或者锁原来已经加锁
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
// 直接返回(没有设置 锁的状态做解锁 *** 作)
return
}
// 新状态, 等待的G-1,并且设置唤醒锁
new = (old - 1<
释放锁的逻辑 :
1、获取新状态(将旧状态的锁去掉)
2、如果原来锁就未被持有, 则直接抛出异常
3、进入循环
4、如果没有被阻塞的G或者有唤醒的G或者锁原来已经加锁, 直接返回
5、获取并设置新状态, 如果设置成功, 则唤醒所有等待中的G, 然后返回
6、如果没有设置成功, 则重新获取锁的状态, 进入循环
其中 seam 是一个信号量, 当他被修改之后, 唤醒队首休眠的G
第三版锁:
func (m *Mutex) Lock() {
// Fast path: 幸运之路,正好获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
iter := 0
for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
old := m.state // 先保存当前锁的状态
new := old | mutexLocked // 新状态设置加锁标志
if old&mutexLocked != 0 { // 锁还没被释放
if runtime_canSpin(iter) { // 还可以自旋
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 进入自旋
runtime_doSpin()
iter++
continue // 自旋,再次尝试请求锁
}
new = old + 1<
第三版跟第二版相比, 在循环的时候, 没获取到锁则进行一次自旋, 总共可以进行一定次数的自旋,每次自旋完直接再次抢锁(避免进入休眠), 只有在自旋n次都没有抢到锁的情况下才会进入休眠。
这样新进来的G因为不需要被唤醒就可以多抢几次锁, 所以概率大些。 而且他们本来就是在运行中,他们抢到锁就可以直接执行, 而休眠中的G抢到锁要进行上下文切换才能执行。
自旋的过程一直在查询锁的状态,锁被释放了本次自旋才结束。
这样一来新加入的G必须尝试抢锁几次后才会进入休眠。
这样存在的问题是对于那些在休眠中的G, 拿到锁的概率低了很多, 大并发的时候可能很难拿到锁。
第四个阶段: 解决饥饿
核心原理就是给锁添加一个饥饿标志, 其他G在看到这个饥饿标志之后放弃抢锁, 加入到阻塞队尾。
每个G抢锁之前都记录下当前时间, 然后下次被唤醒的时候看下这个时间, 如果时间间隔大于1毫秒,
则进入饥饿状态, 将锁的饥饿标志设置为饥饿, 然后自己加入到队首去。
此时如果有其他G正在持有锁, 他在唤醒的时候看到饥饿标志会直接唤醒队首的锁。
如果新的G恰巧抢到了锁, 因为看到了饥饿标志也会让出锁挂起到队尾去。
当多个G *** 作同一个变量时, 如果全部只是读取数据, 这个 *** 作是并发安全的。
只有想要修改数据的时候, 才要考虑加写锁。 读写锁是为了避免被读 *** 作阻塞而影响性能对互斥锁的一种优化(更加细节)。
主要在原来的基础上加了一把 读锁 。
“读锁” : 临界区被加 读锁 之后, 读数据的G可以正常的读取数据, 但是想要修改数据则会被阻塞(其他G可以读,但是不能写)。
“写锁(互斥锁)”:写的时候还是跟原来一样, 其他G不能写, 也不能读。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)