Go中并发详解
前言
前面章节,已经看到了 Go
语言设计简单(尤其是其中),接着就是了解一下 Go
的并行程序设计,Go 从语言层面就支持了并行。
Goroutine
goroutine
是 Go
并行设计的核心。
goroutine
就是协程,但是它比线程更小,十几个 goroutine
可能体现在底层就是五六个线程,Go
语言内部帮你实现了这些 goroutine
之间的内存共享。
执行 goroutine
只需极少的栈内存 (大概是 4~5 KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine
比 thread
更易用、更高效、更轻便。
goroutine
是通过 Go
的 runtime
管理的一个线程管理器。
goroutine
通过 go
关键字实现了,其实就是一个普通的函数。
go hello(a, b, c)
: 这样就很容易的启动了一个协程来执行一段代码。
func say(s string) {
for i := 0; i < 5; i++ {
runtime.Gosched()
fmt.Println(s)
}
}
func main() {
go say("world") // 开一个新的 Goroutines 执行
say("hello") // 当前 Goroutines 执行
}
我们可以看到 go
关键字很方便的就实现了并发编程。
上面的多个goroutine
运行在同一个进程里面,共享内存数据。 不过设计上我们要遵循:不要通过共享内存来通信,而要通过通信来共享。
Channels
goroutine
运行在相同的地址空间,因此访问共享内存必须做好同步。那么 goroutine
之间如何进行数据的通信呢,Go
提供了一个很好的通信机制 channel
。
channel
可以与 Unix shell
中的双向管道做类比:可以通过它发送或者接收值。
这些值只能是特定的类型: channel
类型。定义一个 channel
时,也需要定义发送到 channel
的值的类型。
注意,必须使用 make
创建 channel
: c := make(chan int)
channel
通过操作符 <-
来接收和发送数据
ch <- v // 发送 v 到 channel ch.
v := <-ch // 从 ch 中接收数据,并赋值给v
来一起看个例子,下面的例子定义的是一个 无缓冲区 channel。
func sum(a []int, c chan int) {
total := 0
for _, v := range a {
total += v
}
c <- total // send total to c
}
func main() {
a := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(a[:len(a)/2], c)
go sum(a[len(a)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y)
}
没有缓冲情况下,channel
接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得 Goroutines
同步变的更加的简单,而不需要显式的 lock
。
所谓阻塞,也就是如果读取(value := <-ch)
它将会被阻塞,直到有数据接收。 任何发送(ch<-5)
将会被阻塞,直到数据被读出。
无缓冲 channel
是在多个 goroutine
之间同步很棒的工具。
Buffered Channels
上面介绍了无缓存区类型的 channel
,不过 Go
也允许指定 channel
的缓冲大小,很简单,就是 channel
可以存储多少元素。
ch:= make (chan bool, 4)
,创建了可以存储4个bool
类型 channel
。 在这个 channel
中,前4个元素可以无阻塞的写入。 当写入第5个元素时,代码将会阻塞,直到其他 goroutine
从 channel
中读取一些元素,腾出空间。
ch := make(chan type, value)
当 value = 0
时,channel
是无缓冲阻塞读写的,当 value > 0
时,channel
有缓冲、是非阻塞的,直到写满 value
个元素才阻塞写入。
func main() {
c := make(chan int, 2) // 修改 2 为 1 就报错,修改 2 为 3 可以正常运行
c <- 1
c <- 2
fmt.Println(<-c)
fmt.Println(<-c)
}
Range 和 Close
上面例子中,我们需要读取两次 c
,这样不是很方便,Go
考虑到了这一点,所以也可以通过 range
,像操作 slice
或者 map
一样操作缓存类型的 channel
,请看下面的例子
func fibonacci(n int, c chan int) {
x, y := 1, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
fmt.Println("协程启动完成!")
for i := range c { // 会一直读取,直到channel显式关闭
fmt.Println(i)
}
}
for i := range c
能够不断的读取 channel
里面的数据,直到该 channel
被显式的关闭。
上面代码我们看到可以显式的关闭 channel
,生产者通过内置函数 close
关闭 channel
。
关闭 channel
之后就无法再发送任何数据了,在消费方可以通过语法 v, ok := <-ch
测试 channel
是否被关闭。
如果 ok 返回 false,那么说明 channel 已经没有任何数据并且已经被关闭。
- 应该在生产者的地方关闭 channel,而不是消费的地方去关闭它,这样容易引起 panic
- channel 不像文件之类的,不需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束 range 循环之类
Select
我们上面介绍的都是只有一个 channel
的情况,那么如果存在多个 channel
的时候,我们该如何操作呢,Go
里面提供了一个关键字 select
,通过 select
可以监听 channel
上的数据流动。
select
默认是阻塞的,只有当监听的 channel
中有发送或接收可以进行时才会运行,当多个 channel
都准备好的时候,select
是随机的选择一个执行的。
func fibonacci(c, quit chan int) {
x, y := 1, 1
for {
select {
case c <- x:
x, y = y, x + y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
在 select
里面还有 default
语法,select
其实就是类似 switch
的功能,default
就是当监听的 channel
都没有准备好的时候,默认执行的(select
不再阻塞等待 channel
)。
例如:
select {
case i := <-c:
// use i
default:
// 当 c 阻塞的时候执行这里
}
超时
有时候会出现 goroutine
阻塞的情况,那么我们如何避免整个程序进入阻塞的情况呢?
我们可以利用 select
来设置超时,通过如下的方式实现:
func main() {
c := make(chan int)
o := make(chan bool)
go func() {
for {
select {
case v := <-c:
println(v)
case <-time.After(5 * time.Second):
println("timeout")
o <- true
// 此处的break只是跳出了select循环,并未终止for循环,要用return才能终止这个子进程
break
}
}
}()
<-o
}
runtime goroutine
runtime 包中有几个处理 goroutine 的函数:
- Goexit: 退出当前执行的 goroutine,但是 defer 函数还会继续调用
- Gosched: 让出当前 goroutine 的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行。
- NumCPU: 返回 CPU 核数量
- NumGoroutine: 返回正在执行和排队的任务总数
- GOMAXPROCS: 用来设置可以并行计算的 CPU 核数的最大值,并返回之前的值。