Go语言sync包的应用详解

前言

在并发编程中同步原语也就是我们通常说的锁的主要作用是保证多个线程或者 goroutine在访问同一片内存时不会出现混乱的问题。Go语言的sync包提供了常见的并发编程同步原语,今天文章里让我们回到应用层,聚焦sync包里这些同步原语的应用场景,同时也会介绍sync包中的Pool和Map的应用场景和使用方法。

sync.Mutex

sync.Mutex可能是sync包中使用最广泛的原语。它允许在共享资源上互斥访问(不能同时访问):

mutex := &sync.Mutex{}

mutex.Lock()
// Update共享变量 (比如切片,结构体指针等)
mutex.Unlock()

必须指出的是,在第一次被使用后,不能再对sync.Mutex进行复制。(sync包的所有原语都一样)。如果结构体具有同步原语字段,则必须通过指针传递它。

sync.RWMutex

  1. 互斥锁,使同一时刻只能有一个协程执行某段程序,其他协程等待该协程执行完再依次执行。

  2. 互斥锁只有两个方法 Lock (加锁)和 Unlock(解锁),当一个协程对资源上锁后,只有等该协程解锁,其他协程才能再次上锁。

  3. LockUnlock 是成对出现,为了防止上锁后忘记释放锁,我们可以使用 defer 语句来释放锁。

sync.RWMutex是一个读写互斥锁,它提供了我们上面的刚刚看到的sync.MutexLockUnLock方法(因为这两个结构都实现了sync.Locker接口)。但是,它还允许使用RLock和RUnlock方法进行并发读取:

mutex := &sync.RWMutex{}

mutex.Lock()
// Update 共享变量
mutex.Unlock()

mutex.RLock()
// Read 共享变量
mutex.RUnlock()

sync.RWMutex允许至少一个读锁或一个写锁存在,而sync.Mutex允许一个读锁或一个写锁存在。

通过基准测试来比较这几个方法的性能:

BenchmarkMutexLock-4       83497579         17.7 ns/op
BenchmarkRWMutexLock-4     35286374         44.3 ns/op
BenchmarkRWMutexRLock-4    89403342         15.3 ns/op

可以看到锁定/解锁sync.RWMutex读锁的速度比锁定/解锁sync.Mutex更快,另一方面,在sync.RWMutex上调用Lock()/ Unlock()是最慢的操作。

因此,只有在频繁读取和不频繁写入的场景里,才应该使用sync.RWMutex

sync.WaitGroup

sync.WaitGroup也是一个经常会用到的同步原语,它的使用场景是在一个goroutine等待一组goroutine执行完成。

sync.WaitGroup拥有一个内部计数器。当计数器等于0时,则Wait()方法会立即返回。否则它将阻塞执行Wait()方法的goroutine直到计数器等于0时为止。

要增加计数器,我们必须使用Add(int)方法。要减少它,我们可以使用Done()(将计数器减1),也可以传递负数给Add方法把计数器减少指定大小,Done()方法底层就是通过Add(-1)实现的。

在以下示例中,我们将启动八个goroutine,并等待他们完成:

wg := &sync.WaitGroup{}

for i := 0; i < 8; i++ {
  wg.Add(1)
  go func() {
    // Do something
    wg.Done()
  }()
}

wg.Wait()
// 继续往下执行...

每次创建goroutine时,我们都会使用wg.Add(1)来增加wg的内部计数器。我们也可以在for循环之前调用wg.Add(8)

与此同时,每个goroutine完成时,都会使用wg.Done()减少wg的内部计数器。

main goroutine会在八个goroutine都执行wg.Done()将计数器变为0后才能继续执行。

sync.Map

sync.Map是一个并发版本的Go语言的map,我们可以:

  • 使用Store(interface {},interface {})添加元素。

  • 使用Load(interface {}) interface {}检索元素。

  • 使用Delete(interface {})删除元素。

  • 使用LoadOrStore(interface {},interface {}) (interface {},bool)检索或添加之前不存在的元素。如果键之前在map中存在,则返回的布尔值为true。

  • 使用Range遍历元素。

m := &sync.Map{}

// 添加元素
m.Store(1, "one")
m.Store(2, "two")

// 获取元素1
value, contains := m.Load(1)
if contains {
  fmt.Printf("%s\n", value.(string))
}

// 返回已存value,否则把指定的键值存储到map中
value, loaded := m.LoadOrStore(3, "three")
if !loaded {
  fmt.Printf("%s\n", value.(string))
}

m.Delete(3)

// 迭代所有元素
m.Range(func(key, value interface{}) bool {
  fmt.Printf("%d: %s\n", key.(int), value.(string))
  return true
})

上面的程序会输出:

one
three
1: one
2: two

如你所见,Range方法接收一个类型为func(key,value interface {})bool的函数参数。如果函数返回了false,则停止迭代。有趣的事实是,即使我们在恒定时间后返回false,最坏情况下的时间复杂度仍为O(n)

我们应该在什么时候使用sync.Map而不是在普通的map上使用sync.Mutex

  • 当我们对map有频繁的读取和不频繁的写入时。

  • 当多个goroutine读取,写入和覆盖不相交的键时。 具体是什么意思呢? 例如,如果我们有一个分片实现,其中包含一组4个goroutine,每个goroutine负责25%的键(每个负责的键不冲突)。在这种情况下,sync.Map是首选。

sync.Pool

sync.Pool是一个并发池,负责安全地保存一组对象。它有两个导出方法:

  • Get() interface{} 用来从并发池中取出元素。
  • Put(interface{}) 将一个对象加入并发池。
pool := &sync.Pool{}

pool.Put(NewConnection(1))
pool.Put(NewConnection(2))
pool.Put(NewConnection(3))

connection := pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)
connection = pool.Get().(*Connection)
fmt.Printf("%d\n", connection.id)

输出:

1
3
2

需要注意的是Get()方法会从并发池中随机取出对象,无法保证以固定的顺序获取并发池中存储的对象。

还可以为sync.Pool指定一个创建者方法:

pool := &sync.Pool{
  New: func() interface{} {
    return NewConnection()
  },
}

connection := pool.Get().(*Connection)

这样每次调用Get()时,将返回由在pool.New中指定的函数创建的对象(在本例中为指针)。

那么什么时候使用sync.Pool?有两个用例:

  • 第一个是当我们必须重用共享的和长期存在的对象(例如,数据库连接)时。
  • 第二个是用于优化内存分配。

让我们考虑一个写入缓冲区并将结果持久保存到文件中的函数示例。使用sync.Pool,我们可以通过在不同的函数调用之间重用同一对象来重用为缓冲区分配的空间。

第一步是检索先前分配的缓冲区(如果是第一个调用,则创建一个缓冲区,但这是抽象的)。然后,defer操作是将缓冲区放回sync.Pool中。

func writeFile(pool *sync.Pool, filename string) error {
    buf := pool.Get().(*bytes.Buffer)

  defer pool.Put(buf)

    // Reset 缓存区,不然会连接上次调用时保存在缓存区里的字符串foo
    // 编程foofoo 以此类推
    buf.Reset()

    buf.WriteString("foo")

    return ioutil.WriteFile(filename, buf.Bytes(), 0644)
}

sync.Once

sync.Once是一个简单而强大的原语,可确保一个函数仅执行一次。在下面的示例中,只有一个goroutine会显示输出消息:

once := &sync.Once{}
for i := 0; i < 4; i++ {
    i := i
    go func() {
        once.Do(func() {
            fmt.Printf("first %d\n", i)
        })
    }()
}

我们使用了Do(func ())方法来指定只能被调用一次的部分。

注意:sync.Once 是说让内部的函数执行完成的时候,才会释放。

  • 多个线程需要进入执行对应逻辑的时候只能等待。 与执行结果无关。
  • 如果用于初始化(singleton)模式,如果失败务必panic。

sync.Cond

使用场景:

  • 我们有一项任务,只有满足了条件情况下才能执行,否则就等着。如何获取这个条件呢?可以使用 channel 的方式,但是 channel 适用于一对一,一对多就需要用到 sync.Cond

  • sync.Cond 是基于互斥锁的基础上,增加了一个通知队列,协程刚开始是等待的,通知的协程会从通知队列中唤醒一个或多个被通知的协程。

sync.Cond 主要有以下几个方法:

  • sync.NewCond(&mutex) //sync.Cond 通过 sync.NewCond 初始化,需要传入一个 mutex,因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于 sync.Mutex 来实现的。

  • sync.Wait() //等待通知

阻塞当前协程,直到被其他协程调用 Broadcast 或者 Signal 方法唤醒,使用的时候需要加锁,使用 sync.Cond 中的锁即可

  • sync.Signal() //单发通知,随机唤醒一个协程

  • sync.Broadcat() //广播通知,唤醒所有等待的协程。

sync.Cond可能是sync包提供的同步原语中最不常用的一个,它用于发出信号(一对一)或广播信号(一对多)到goroutine。让我们考虑一个场景,我们必须向一个goroutine指示共享切片的第一个元素已更新。创建sync.Cond需要sync.Locker对象(sync.Mutexsync.RWMutex):

cond := sync.NewCond(&sync.Mutex{})

然后,让我们编写负责显示切片的第一个元素的函数:

func printFirstElement(s []int, cond *sync.Cond) {
    cond.L.Lock()
    cond.Wait()
    fmt.Printf("%d\n", s[0])
    cond.L.Unlock()
}

我们可以使用cond.L访问内部的互斥锁。一旦获得了锁,我们将调用cond.Wait(),这会让当前goroutine在收到信号前一直处于阻塞状态。

让我们回到main goroutine。我们将通过传递共享切片和先前创建的sync.Cond来创建printFirstElement池。然后我们调用get()函数,将结果存储在s[0]中并发出信号:

s := make([]int, 1)
for i := 0; i < runtime.NumCPU(); i++ {
    go printFirstElement(s, cond)
}

i := get()
cond.L.Lock()
s[0] = i
cond.Signal()
cond.L.Unlock()

这个信号会解除一个goroutine的阻塞状态,解除阻塞的goroutine将会显示s[0]中存储的值。

但是,有的人可能会争辩说我们的代码破坏了Go的最基本原则之一:

不要通过共享内存进行通信;而是通过通信共享内存。

确实,在这个示例中,最好使用channel来传递get()返回的值。但是我们也提到了sync.Cond也可以用于广播信号。我们修改一下上面的示例,把Signal()调用改为调用Broadcast()

i := get()
cond.L.Lock()
s[0] = i
cond.Broadcast()
cond.L.Unlock()

在这种情况下,所有goroutine都将被触发。

众所周知,channel里的元素只会由一个goroutine接收到。通过channel模拟广播的唯一方法是关闭channel

当一个channel被关闭后,channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。

但是这种方式只能广播一次。因此,尽管存在很大争议,但这无疑是sync.Cond的一个有趣的功能。

举个例子:

package main

import (
  "fmt"
  "sync"
  "time"
)

func main() {
  //3个人赛跑,1个裁判员发号施令
  cond := sync.NewCond(&sync.Mutex{})
  var wg sync.WaitGroup
  wg.Add(4) //3选手+1裁判
  for i := 1; i <= 3; i++ {
    go func(num int) {
      defer wg.Done()
      fmt.Println(num, "号选手已经就位")
      cond.L.Lock()
      cond.Wait() //等待发令枪响
      fmt.Println(num, "号选手开始跑……")
      cond.L.Unlock()
    }(i)
  }
  //等待所有goroutine都进入wait状态
  time.Sleep(2 * time.Second)
  go func() {
    defer wg.Done()
    fmt.Println("裁判:“各就各位~~预备~~”")
    fmt.Println("啪!!!")
    cond.Broadcast() //发令枪响
  }()
  //防止函数提前返回退出
  wg.Wait()
}

运行结果:

3 号选手已经就位
1 号选手已经就位
2 号选手已经就位
裁判:“各就各位~~预备~~”
啪!!!
2 号选手开始跑……
3 号选手开始跑……
1 号选手开始跑……

Go并发编程--errGroup

上文了解了 sync.WaitGroup 的用法都知道

一个 goroutine 需要等待多个 goroutine 完成和多个 goroutine 等待一个 goroutine 干活时都可以解决问题

WaitGroup 的确是一个很强大的工具,但是使用它相对来说还是有一点小麻烦,

  • 一方面我们需要自己手动调用 Add()Done() 方法,一旦这两个方法有一个多调用或者少调用,最终都有可能导致程序崩溃,所以我们在使用这两个方法的时候要格外小心,确保最终计数器能够达到 0 的状态;

  • 另一方面就是它不能抛出错误给调用者,只要一个 goroutine 出错我们就不再等其他 goroutine 了,减少资源浪费,所以我们只能通过声明多个外部变量的方式(或者声明一个变量然后通过加锁来更新它的值)来分别接收每个协程的 error 才行,就像下面的代码:

func main() {
 var (
 	wg sync.WaitGroup
 	err1, err2 error  // 通过在外部定义变量用来记录错误
 )
 
 wg.Add(2)
 go func() {
 	defer wg.Done()
 	fmt.Print("task 1")
 	err1 = nil
 }()
 
 go func() {
 	defer wg.Done()
 	fmt.Print("task 2")
 	err2 = fmt.Errorf("task 2 error")
 }()
 wg.Wait()
 
 if err1 != nil || err2 != nil {
 	// TODO
 }
 
 fmt.Print("finish")
}
 

使用WaitGroup 都不能很好的解决。 所以此时可以使用 ErrGroup 就可以解决问题了。

Errgroup

Errgroup 是 Golang 官方提供的一个同步扩展库, 它和 WaitGroup 的作用类似,但是它提供了更加丰富的功能以及更低的使用成本:

  • 和context集成;
  • 能够对外传播error,可以把子任务的错误传递给Wait 的调用者.

Errgroup 的代码非常简短,加上注释一共才 66 行,包含一个结构体以及三个对外暴露的方法,接下来就让我们走进源码,来具体看一下它是如何工作的

Group

type Group struct {
    // context 的 cancel 方法
	cancel func()
 
    // 复用 WaitGroup
	wg sync.WaitGroup
 
	// 用来保证只会接受一次错误
	errOnce sync.Once
    // 保存第一个返回的错误
	err     error
}

2.2 WithContext

func WithContext(ctx context.Context) (*Group, context.Context) {
	// 使用 contex.WithCancel创建一个可以取消的 context 将 cancel 赋值给 Group 保存起来
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

WithContext 就是使用 WithCancel创建一个可以取消的 contextcancel 赋值给 Group 保存起来,然后再将 context 返回回去

注意这里有一个坑,在后面的代码中不要把这个 ctx 当做父 context 又传给下游,因为 errgroup 取消了,这个 context 就没用了,会导致下游复用的时候出错

2.3 Go

Go 方法传入一个 func() error 内部会启动一个 goroutine 去处理

// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
    // wg.Add(1) 计数器加 1
	g.wg.Add(1)
 
	go func() {
		defer g.wg.Done()
 
		if err := f(); err != nil {
		  // 这里使用sync.Once作用,保证传入的 无入参函数执行一次
		  // 如果有 error,则记录发生的第一个 error
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

Go 方法其实就类似于 go 关键字,会启动一个协程,然后利用 waitgroup 来控制是否结束,如果有一个非 nilerror 出现就会保存起来并且如果有 cancel 就会调用 cancel 取消掉,使 ctx 返

2.4 Wait

/ Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
    // wg.Wait() 等待所有任务执行完毕
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

Wait 方法其实就是调用 WaitGroup 等待,如果有 cancel 就调用一下

errgroup案例

3.1 记录错误

使用 Errgroup,上述代码可以改为下面的样子:

package main
 
import (
	"fmt"
 
	"golang.org/x/sync/errgroup"
)
 
func main() {
	var eg errgroup.Group
 
	//匿名函数将会通过GO关键字启动一个协程
	eg.Go(func() error {
		fmt.Print("task 1\n")
		return nil
	})
 
	eg.Go(func() error {
		fmt.Print("task 2\n")
		return fmt.Errorf("task 2 error")
	})
 
	// 使用Wait 等待所有的协程执行完毕后,再进行后面的逻辑,同时可以记录两个协程的错误
	if err := eg.Wait(); err != nil {
		fmt.Printf("some error occur: %s\n", err.Error())
	}
 
	fmt.Print("over")
}

代码简洁了很多

3.2 一个协程出错,其他协程终止

基于 errgroup 实现一个 http server 的启动和关闭 ,以及 linux signal 信号的注册和处理,要保证能够 一个退出,全部注销退出。

package main
 
import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"
 
	"golang.org/x/sync/errgroup"
)
 
func main() {
	g, ctx := errgroup.WithContext(context.Background())
 
	mux := http.NewServeMux()
	mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("pong"))
	})
 
	// 模拟单个服务错误退出
	serverOut := make(chan struct{})
	mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
		serverOut <- struct{}{}
	})
 
	server := http.Server{
		Handler: mux,
		Addr:    ":8080",
	}
 
	// g1
	// g1 退出了所有的协程都能退出么?
	// g1 退出后, context 将不再阻塞,g2, g3 都会随之退出
	// 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
	g.Go(func() error {
		return server.ListenAndServe()
	})
 
	// g2
	// g2 退出了所有的协程都能退出么?
	// g2 退出时,调用了 shutdown,g1 会退出
	// g2 退出后, context 将不再阻塞,g3 会随之退出
	// 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
	g.Go(func() error {
		select {
		case <-ctx.Done():
			log.Println("errgroup exit...")
		case <-serverOut:
			log.Println("server will out...")
		}
 
		timeoutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
		// 这里不是必须的,但是如果使用 _ 的话静态扫描工具会报错,加上也无伤大雅
		defer cancel()
 
		log.Println("shutting down server...")
		return server.Shutdown(timeoutCtx)
	})
 
	// g3
	// g3 捕获到 os 退出信号将会退出
	// g3 退出了所有的协程都能退出么?
	// g3 退出后, context 将不再阻塞,g2 会随之退出
	// g2 退出时,调用了 shutdown,g1 会退出
	// 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
	g.Go(func() error {
		quit := make(chan os.Signal, 0)
		signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
 
		select {
		case <-ctx.Done():
			return ctx.Err()
		case sig := <-quit:
			return fmt.Errorf("get os signal: %v", sig)
		}
	})
 
	fmt.Printf("errgroup exiting: %+v\n", g.Wait())
}

运行后,使用 ctrl + C 终止程序,终端输出如下

2021/11/03 10:20:34 errgroup exit...
2021/11/03 10:20:34 shutting down server...
errgroup exiting: get os signal: interrupt

这里主要用到了 errgroup 一个出错,其余取消的能力

关于我
loading