一些常见并发编程错误
Go语言是一门天然支持并发的编程语言。 通过使用go
关键字,我们可以很轻松地创建协程;通过使用和Go中提供的其它,并发编程变得简单、轻松和有趣。
另一方面,Go并不阻止程序员在并发编程中因为粗心或者经验不足而犯错。 本文的余下部分将展示一些常见的并发错误,来帮助Go程序员在实践中避免这些错误。
我们已经知道,源文件中的代码行在运行时刻。
下面这个示例程序犯了两个错误:
- 首先,主协程中对变量
b
的读取和匿名协程中的对变量b
的写入可能会产生数据竞争; - 其次,在主协程中,条件
b == true
成立并不能确保条件a != nil
也成立。 编译器和CPU可能会对调整此程序中匿名协程中的某些指令的顺序已获取更快的执行速度。 所以,站在主协程的视角看,对变量b
的赋值可能会发生在对变量a
的赋值之前,这将造成在修改a
的元素时a
依然为一个nil切片。
上面这个程序可能在很多计算机上运行良好,但是可能会在某些计算机上因为恐慌而崩溃退出;或者使用某些编译器编译的时候运行良好,但使用另外的某个编译器编译的时候将造成程序运行时崩溃退出。
我们应该使用通道或者sync
标准库包中的同步技术来确保内存顺序。比如:
package main
func main() {
var a []int = nil
c := make(chan struct{})
go func () {
a = make([]int, 3)
c <- struct{}{}
}()
<-c
a[0], a[1], a[2] = 0, 1, 2 // 绝不会造成恐慌
}
使用time.Sleep
调用来做同步
让我们看一个简单的例子:
package main
import (
"fmt"
"time"
)
func main() {
var x = 123
go func() {
x = 789 // 写入x
}()
time.Sleep(time.Second)
fmt.Println(x) // 读取x
}
我们期望着此程序打印出789
。 事实上,则其运行结果常常正如我们所期待的。 但是,此程序中的同步处理实现的正确吗?否!原因很简单,Go运行时并不能保证对x
的写入一定发生在对x
的读取之前。 在某些特定的情形下,比如CPU资源被很一些其它计算密集的程序所占用,则对x
的写入有可能发生在对x
的读取之后。 因此,我们不应该在正式的项目中使用time.Sleep
调用来做同步。
让我们看另一个简单的例子:
package main
import (
"fmt"
"time"
)
var x = 0
func main() {
var num = 123
var p = &num
c := make(chan int)
go func() {
c <- *p + x
}()
time.Sleep(time.Second)
num = 789
fmt.Println(<-c)
}
你觉得此程序会输出什么?123
还是789
? 事实上,它的输出是和具体使用的编译器相关的。 对于标准编译器1.17版本来说,它很可能输出123
。 但是从理论上说,它输出789
或者另外一个预想不到的值也是有可能的。
让我们将此例中的c <- *p + x
一行换成c <- *p
,然后重新运行它,你将会发现它的输出变成了789
(如果它使用标准编译器1.17版本编译的话)。 重申一次,此结果是和具体使用的编译器和编译器的版本相关的。
是的,此程序中存在数据竞争。表达式*p
的估值可能发生在赋值num = 789
之前、之后、或者同时。 time.Sleep
调用并不能保证*p
的估值发生在此赋值之后。
对于这个特定的例子,我们应该将欲发送的值在开启新协程之前存储在一个临时变量中来避免数据竞争。
...
tmp := *p
c <- tmp
}()
...
使一些协程永久处于阻塞状态
有很多原因导致某个协程永久阻塞,比如:
- 从一个永远不会有其它协程向其发送数据的通道接收数据;
- 向一个永远不会有其它协程从中读取数据的通道发送数据;
- 被自己死锁了;
- 和其它协程相互死锁了;
- 等等。
在通道用例中,如果被当作future/promise来用的通道的容量不足够大,则较慢回应的协程在准备发送回应结果时将永久阻塞。 比如,下面的例子中,每个请求将导致4个协程永久阻塞。
为了防止有4个协程永久阻塞,被当作future/promise使用的通道的容量必须至少为4
.
在第二种“采用最快回应”实现方法中,如果被当作future/promise使用的通道是一个非缓冲通道(如下面的代码所示),则有可能导致其通道的接收者可能会错过所有的回应而导致处于永久阻塞状态。
func request() int {
c := make(chan int)
for i := 0; i < 5; i++ {
i := i
go func() {
case c <- i:
default:
}
}()
}
return <-c // 有可能永久阻塞在此
}
接收者协程可能会永久阻塞的原因是如果5个尝试发送操作都发生在接收操作<-c
准备好之前,亦即5个个尝试发送操作都失败了,则接收者协程将永远无值可接收(从而将处于永久阻塞状态)。
将通道c
改为一个缓冲通道,则至少会有一个尝试发送将成功,从而接收者协程肯定不会永久阻塞。
在实践中,sync
标准库包中的类型(除了Locker
接口类型)的值不应该被复制。 我们只应该复制它们的指针值。
下面是一个有问题的并发编程的例子。 在此例子中,当Counter.Value
方法被调用时,一个Counter
属主值将被复制,此属主值的字段Mutex
也将被一同复制。 此复制并没有被同步保护,因此复制结果可能是不完整的,并非被复制的属主值的一个快照。 即使此Mutex
字段得以侥幸完整复制,它的副本所保护的是对字段n
的一个副本的访问,因此一般是没有意义的。
import "sync"
type Counter struct {
sync.Mutex
n int64
}
// 此方法实现是没问题的。
func (c *Counter) Increase(d int64) (r int64) {
c.Lock()
c.n += d
r = c.n
c.Unlock()
return
}
// 此方法的实现是有问题的。当它被调用时,
// 一个Counter属主值将被复制。
func (c Counter) Value() (r int64) {
c.Lock()
r = c.n
c.Unlock()
return
}
我们应该将Value
方法的属主参数类型更改为指针类型*Counter
来避免复制sync.Mutex
值。
Go官方工具链中提供的go vet
命令将提示此例中的Value
方法的声明可能是一个潜在的逻辑错误。
在错误的地方调用sync.WaitGroup.Add
方法
每个sync.WaitGroup
值内部维护着一个计数。此计数的初始值为0。 如果一个sync.WaitGroup
值的Wait
方法在此计数为0的时候被调用,则此调用不会阻塞,否则此调用将一直阻塞到此计数变为0为止。
为了让一个WaitGroup
值的使用有意义,在此值的计数为0的情况下,对它的下一次Add
方法的调用必须出现在对它的下一次Wait
方法的调用之前。
比如,在下面的例子中,Add
方法的调用位置是不合适的。 此例子程序的打印结果并不总是100
,而可能是0
到100
间的任何一个值。 原因是没有任何一个Add
方法调用可以确保发生在唯一的Wait
方法调用之前,结果导致没有任何一个Done
方法调用可以确保发生在唯一的Wait
方法调用返回之前。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var wg sync.WaitGroup
var x int32 = 0
for i := 0; i < 100; i++ {
go func() {
wg.Add(1)
atomic.AddInt32(&x, 1)
wg.Done()
}()
}
fmt.Println("等待片刻...")
wg.Wait()
fmt.Println(atomic.LoadInt32(&x))
}
我们应该将对Add
方法的调用移出匿名协程之外,像下面这样,使得任何一个Done
方法调用都确保发生在唯一的Wait
方法调用返回之前。
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
atomic.AddInt32(&x, 1)
wg.Done()
}()
}
...
不当地使用用做Future/Promise的通道
从通道用例大全一文中,我们了解到一些函数可以返回用做future/promise的通道结果。 假设fa
和fb
是这样的两个函数,则下面的调用方式并没有体现出这两个函数的真正价值。
我们应该像下面这样调用这两个函数来并发生成两个回应结果:
doSomethingWithFutureArguments(<-ca, <-cb)
Go程序员常犯的一个错误是关闭一个后续可能还会有协程向其发送数据的通道。 当向一个已关闭的通道发送数据的时候,一个恐慌将产生。
这样的错误曾经发生在一些很有名的项目中,比如Kubernetes项目中的这个bug和。
请阅读此篇文章来了解如何安全和优雅地关闭通道。
对地址不保证为8字节对齐的值执行64位原子操作
截至目前(Go 1.17),64位原子操作中涉及到的实参地址必须为8字节对齐的。不满足此条件的64位原子操作将造成一个恐慌。 对于标准编译器,这样的情形只可能发生在32位的架构中。 请阅读来获知如何确保让64位的整数值的地址在32位的架构中8字节对齐。
没留意过多的time.After
函数调用消耗了大量资源
time
标准库包中的After
函数返回。 此函数给并发编程带来了很多便利,但是它的每个调用都需要创建一个time.Timer
值,此新创建的Timer
值在传递给After
函数调用的时长(实参)内肯定不会被垃圾回收。 如果此函数在某个时段内被多次频繁调用,则可能导致积累很多尚未过期的Timer
值从而造成大量的内存和计算消耗。
比如在下面这个例子中,如果longRunning
函数被调用并且在一分钟内有一百万条消息到达, 那么在某个特定的很小时间段(大概若干秒)内将存在一百万个活跃的Timer
值,即使其中只有一个是真正有用的。
import (
"fmt"
"time"
)
// 如果某两个连续的消息的间隔大于一分钟,此函数将返回。
func longRunning(messages <-chan string) {
for {
select {
case <-time.After(time.Minute):
return
case msg := <-messages:
fmt.Println(msg)
}
}
}
为了避免太多的Timer
值被创建,我们应该只使用(并复用)一个Timer
值,像下面这样:
func longRunning(messages <-chan string) {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
for {
select {
case <-timer.C: // 过期了
return
case msg := <-messages:
fmt.Println(msg)
// 此if代码块很重要。
if !timer.Stop() {
<-timer.C
}
}
// 必须重置以复用。
timer.Reset(time.Minute)
}
}
注意,此示例中的if
代码块用来舍弃一个可能在执行第二个分支代码块的时候发送过来的超时通知。
一个典型的time.Timer
的使用已经在上一节中展示了。一些解释:
- 如果一个
Timer
值已经过期或者已经被终止(stopped),则相应的Stop
方法调用返回false
。 在此Timer
值尚未终止的时候,Stop
方法调用返回false
只能意味着此Timer
值已经过期。 - 一个
Timer
值被终止之后,它的通道字段C
最多只能含有一个过期的通知。 - 在一个
Timer
终止(stopped)之后并且在重置和重用此Timer
值之前,我们应该确保此Timer
值中肯定不存在过期的通知。 这就是上一节中的例子中的if
代码块的意义所在。
一个*Timer
值的Reset
方法必须在对应Timer
值过期或者终止之后才能被调用; 否则,此Reset
方法调用和一个可能的向此Timer
值的C
通道字段的发送通知操作产生数据竞争。
如果上一节中的例子中的select
流程控制代码块中的第一个分支被选中,则这表示相应的Timer
值已经过期,所以我们不必终止它。 但是我们必须在第二个分支中通过终止此Timer
以检查此Timer
中是否存在一个过期的通知。 如果确实有一个过期的通知,我们必须在重用这个Timer
之前将此过期的通知取出;否则,此过期的通知将下一个循环步导致在第一个分支立即被选中。
比如,下面这个程序将在运行后大概一秒钟(而不是十秒钟)后退出。 而且此程序存在着潜在的数据竞争。
package main
import (
"fmt"
"time"
)
func main() {
start := time.Now()
timer := time.NewTimer(time.Second/2)
select {
case <-timer.C:
default:
time.Sleep(time.Second) // 此分支被选中的可能性较大
}
timer.Reset(time.Second * 10) // 可能数据竞争
<-timer.C
fmt.Println(time.Since(start)) // 大约1s
当一个time.Timer
值不再被使用后,我们不必(但是推荐)终止之。
我们不应该依赖于的Reset
方法的返回值。此返回值只要是为了历史兼容性而存在的。