Go中内存泄漏常见Case

背景

Go虽然是自动GC类型的语言,但在编码过程中如果不注意,很容易造成内存泄漏的问题。比较常见的是发生在 slice、time.Ticker、goroutine 等的使用过程中,这里结合我们日常中经常遇到的,以及网上搜集到一些Case进行系统性的总结一下,希望对你的日常工作有所帮助。

slice 类型引起内存泄漏

1. 传入的参数被切片返回,导致局部变量不能被释放

Golang是自带GC的,如果资源一直被占用,是不会被自动释放的,比如下面的代码,如果传入的slice b是很大的,然后引用很小部分给全局量a,那么b未被引用的部分就不会被释放,造成了所谓的内存泄漏。

var a []int 
func test(b []int) {
    a = b[:3]
    return
}

想要理解这个内存泄漏,主要就是理解上面的a = b[:3]是一个引用,其实新、旧slice指向的都是同一片内存地址,那么只要全局量a在,b就不会被回收。

如果想避免这个问题,可以使用append方法的实现,如果append的目标slice空间不够,会重新申请一个array来放需要append的内容,所以&b[0]&a[0]的值是不一样的,而&a[0]&c[0]地址是一致的:

time.Sleep(time.Second * 5)
fmt.Println("main func")
var b []int
var c []int
// 现在,如果再没有其它值引用着承载着a元素的内存块,
// 则此内存块可以被回收了。
func test(a []int) {
   c = a[:1]
   b = append(a[:0:0], a[:1]...) // 秀操作而已,也可以使用nil
   fmt.Println(&a[0], &c[0], &b[0]) //0xc0000aa030 0xc0000aa030 0xc0000b2038
}

也可以使用 copy()函数来实现引用类型的深拷贝。copy(dst[], src[])

2. 切片容量导致内存泄漏

假如我们从网络中接受了很大的数据,该协议使用前5个字节标识消息类型。

func consumeMessages() {
      msg := receiveMessage() // a
      storeMessageType(getMessageType(msg)) //b
      // 其他的逻辑处理
}
// 然后msg作为一个参数
func getMessageType(msg []byte) []byte { //c
    return msg[:5]
}

我们只想存储每个消息的前5字节代表的消息类型,但同时我们将每条消息的整个容量的数据也存储在了内存中。
解决方式可以使用copy方法,来替代对msg进行切分:

func getMessageType(msg []byte) []byte {
    msgType := make([]byte, 5)
    copy(msgType, msg)
    return msgType
}

3. 数组值传递

由于数组是Golang的基本数据类型,每个数组占用不同的内存空间,生命周期互不干扰,很难出现内存泄漏的情况,但是数组作为形参传输时,遵循的是值拷贝,如果函数被多个goroutine调用且数组过大时,则会导致内存使用激增。

因此对于大数组放在形参场景下通常使用切片或者指针进行传递,避免短时间的内存使用激增。

goroutine导致内存泄漏

Go内存泄露,大部分都是goroutine泄露导致的。 虽然每个goroutine仅占用少量(栈)内存,但当大量goroutine被创建却不会释放时(即发生了goroutine泄露),也会消耗大量内存,造成内存泄露。

另外,如果goroutine里还有在堆上申请空间的操作,则这部分堆内存也不能被垃圾回收器回收。 在Go中大概单个goroutine占用2.6k左右的内存空间。

Go 10次内存泄漏,8次goroutine泄漏,1次是真正内存泄漏,还有1次是cgo导致的内存泄漏 (“才高八斗”的既视感..)

1. Goroutine 内存泄漏的原因

Go 语言的内存泄漏通常因为错误地使用 goroutine 和 channel。例如以下几种情况:

  1. 在 goroutine 里打开一个连接(如 gRPC)但是忘记 close。

  2. 在 goroutine 里的全局变量对象没有释放。

  3. 在 goroutine 里读 channel, 但是没有写入端,而被阻塞。

  4. 在 goroutine 里写入无缓冲的 channel,但是由于 channel 的读端被其他协程关闭而阻塞。

  5. 在 goroutine 里写入有缓冲的 channel,但是 channel 缓冲已满。

  6. select操作在所有case上都阻塞,造成内存泄漏

其实本质上还是channel问题, 因为 select..case只能处理 channel类型, 即每个 case 必须是一个通信操作, 要么是发送要么是接收,select 将随机执行一个可运行 case, 如果没有 case 可运行,它将阻塞,直到有 case 可运行。 有个独立 goroutine去做某些操作的场景下,为了能在外部结束它,通常有两种方法:

  • 同时传入一个用于控制goroutine退出的 quit channel,配合 select,当需要退出时close 这个 quit channel,该 goroutine 就可以退出

  • 使用context 包的WithCancel,可参考context.WithCancel()的使用

  1. I/O问题,I/O连接未设置超时时间,导致goroutine一直在等待,代码会一直阻塞。

  2. 互斥锁未释放,goroutine无法获取到锁资源,导致goroutine阻塞

//协程拿到锁未释放,其他协程获取锁会阻塞
func mutexTest() {
    mutex := sync.Mutex{}
    for i := 0; i < 10; i++ {
        go func() {
            mutex.Lock()
            fmt.Printf("%d goroutine get mutex", i)
            //模拟实际开发中的操作耗时
            time.Sleep(100 * time.Millisecond)
        }()
    }
    time.Sleep(10 * time.Second)
}
  1. 死锁,当程序死锁时其他goroutine也会阻塞
func mutexTest() {
    m1, m2 := sync.Mutex{}, sync.RWMutex{}
    //g1得到锁1去获取锁2
    go func() {
        m1.Lock()
        fmt.Println("g1 get m1")
        time.Sleep(1 * time.Second)
        m2.Lock()
        fmt.Println("g1 get m2")
    }()
    //g2得到锁2去获取锁1
    go func() {
        m2.Lock()
        fmt.Println("g2 get m2")
        time.Sleep(1 * time.Second)
        m1.Lock()
        fmt.Println("g2 get m1")
    }()
    //其余协程获取锁都会失败
    go func() {
        m1.Lock()
        fmt.Println("g3 get m1")
    }()
    time.Sleep(10 * time.Second)
}
  1. waitgroup使用不当。

waitgroupAddDonewait数量不匹配会导致wait一直在等待。
上面列的情况,在日常开发过程中不容易发现,因此会经常带来一些线上的问题。

2. select-case 误用导致的内存泄露

func TestLeakOfMemory(t *testing.T) {
   fmt.Println("NumGoroutine:", runtime.NumGoroutine())
   chanLeakOfMemory()
   time.Sleep(time.Second * 3) // 等待 goroutine 执行,防止过早输出结果
   fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

func chanLeakOfMemory() {
   errCh := make(chan error) // 1
   go func() { // (5)
      time.Sleep(2 * time.Second)
      errCh <- errors.New("chan error") // 2
      fmt.Println("finish sending")
   }()

   var err error
   select {
   case <-time.After(time.Second): // 3 大家也经常在这里使用 <-ctx.Done()
      fmt.Println("超时")
   case err = <-errCh: // 4
      if err != nil {
         fmt.Println(err)
      } else {
         fmt.Println(nil)
      }
   }
}

输出结果如下:

NumGoroutine: 2
超时
NumGoroutine: 3

这是 go channel 导致内存泄漏的经典场景。 根据输出结果(开始有两个 goroutine,结束时有三个 goroutine),我们可以知道,直到测试函数结束前,仍有一个 goroutine 没有退出。

原因是由于 1 处创建的 errCh 是不含缓存队列的 channel,如果 channel 只有发送方发送,那么发送方会阻塞;如果 channel 只有接收方,那么接收方会阻塞。
可以看到由于没有发送方往 errCh 发送数据,所以 4 处代码一直阻塞。
直到 3 处超时后,打印“超时”,函数退出,4 处代码都未接收成功。
而 2 处的所在的 goroutine 在“超时”被打印后,才开始发送。
由于外部的 goroutine 已经退出了,errCh 没有接收者,导致 2 处一直阻塞。
因此 2 处代码所在的协程一直未退出,造成了内存泄漏。

如果代码中有许多类似的代码,或在 for 循环中使用了上述形式的代码,随着时间的增长会造成多个未退出的 gorouting,最终导致程序 OOM。

这种情况其实还比较简单。我们只需要为 channel 增加一个缓存队列。即把 (1) 处代码改为 errCh := make(chan error, 1) 即可。修改后输出如下所示,可知我们创建的 goroutine 已经退出了。

NumGoroutine: 2
超时
NumGoroutine: 2

如果使用 defer close(errCh) 关闭 channel。比如把 1 处代码改为如下形式(错误):

errCh := make(chan error)
defer close(errCh)

由于 2 处代码没有接收者,所以一直阻塞。直到 close(errCh) 运行,2 处仍在阻塞。这导致关闭 channel 时,仍有 goroutine 在向 errCh 发送。然而在 golang 中,在向 channel 发送时不能关闭 channel,否则会 panic。因此这种方式是错误的。

又或在 5 处 goroutine 的第一句加上 defer close(errCh)。由于 2 处阻塞, defer close(errCh) 会一直得不到执行。因此也是错误的。 即便对调 2 处和 4 处的发送者和接收者,也会因为 channel 关闭,导致输出无意义的零值。

3. for range 导致的协程泄漏

func leakOfMemory_1(nums ...int) {
   out := make(chan int)
   // sender
   go func() {
      defer close(out)
      for _, n := range nums {   // c.
         out <- n
         fmt.Printf("sender success: %v\n", n)
         time.Sleep(time.Second)
      }
   }()

   // receiver
   go func() {
      ctx, cancel := context.WithTimeout(context.Background(), time.Second)
      defer cancel()
      for n := range out {      //b.
         if ctx.Err() != nil {  //a.
            fmt.Println("ctx timeout ")
            return
         }
         fmt.Println(n)
      }
   }()
}

// 单测文件中执行
func TestLeakOfMemory(t *testing.T) {
   fmt.Println("NumGoroutine:", runtime.NumGoroutine())
   leakOfMemory_1(1, 2, 3, 4, 5, 6, 7)
   time.Sleep(3 * time.Second)
   fmt.Println("main exit...")
   fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

执行结果如下:

=== RUN   TestLeakOfMemory
NumGoroutine: 2
1
sender success: 1
sender success: 2
2
ctx timeout 
sender success: 3
main exit...
NumGoroutine: 3
--- PASS: TestLeakOfMemory (3.00s)
PASS

理论上,是不是最开始只有2个goruntine ,实际上执行完出现了3个gorountine。
说明 leakOfMemory_1 里面起码有一个协程没有退出。 因为时间到了,在 a 处,程序就准备退出了,也就是说 b 这个就退出了,没有接收者继续接受 chan 中的数据了。c处往chan 写数据就阻塞了,因此协程一直没有退出,就造成了泄漏。

如何解决上面说的协程泄漏问题? 可以加个管道通知来防止内存泄漏。

4. goruntine 中 map 并发

map 是引用类型,函数值传值是调用,参数副本依然指向m,因为值传递的是引用,对于共享变量,资源并发读写会产生竞争。 下面的场景在工作中经常遇到(测的时候不容易发现)

func TestConcurrencyMap(t *testing.T) {
   m := make(map[int]int)
   go func() {
      for {
         m[3] = 3
      }

   }()
   go func() {
      for {
         m[2] = 2
      }
   }()
   //select {}
   time.Sleep(10 * time.Second)
}

5. time.Ticker 误用造成内存泄漏

注意:TickerTimer 是不同的。Timer 只会定时一次,而 Ticker 如果不 Stop,就会一直发送定时。

func TestTickerNormal(t *testing.T) {
   ticker := time.NewTicker(time.Second)
   defer ticker.Stop() // stop一定不能漏了
   go func() {
      for {
         fmt.Println(<-ticker.C)
      }
   }()

   time.Sleep(time.Second * 3)
   fmt.Println("finish")
}

6. time.After()使用注意事项

看下面的例子:

func TestTimeAfter(t *testing.T) {
   defer func() {
      fmt.Println(runtime.NumGoroutine())
   }()
   go func() {
      ticker := time.NewTicker(time.Second * 1)
      for {
         select {
         case <-ticker.C:
            fmt.Println("hello world")
         case <-time.After(time.Second * 3):
            fmt.Println("exit")
            return
         }
      }
   }()

   time.Sleep(time.Second * 5)
   fmt.Println("main func")
}

// 输出结果如下
=== RUN   TestTimeAfter
hello world
hello world
hello world
hello world
hello world
main func
3
--- PASS: TestTimeAfter (5.00s)
PASS

从输出结果看,程序根本没有打印exit, 也证明了goroutine不是由time.After() 退出,而是函数执行结果退出。

看下关于time.After() 实现原理:After底层是用NewTimer实现, NewTimer(d).C 每次都是 return 了一个新的对象。

func After(d Duration) <-chan Time {
   return NewTimer(d).C
}

func NewTimer(d Duration) *Timer {
   c := make(chan Time, 1)
   t := &Timer{
      C: c,
      r: runtimeTimer{
         when: when(d),
         f:    sendTime,
         arg:  c,
      },
   }
   startTimer(&t.r)
   return t
}

可以进行如下的修改

func TestTimeAfter(t *testing.T) {
   defer func() {
      fmt.Println(runtime.NumGoroutine())
   }()
   idleDuration := time.After(time.Second * 3)
   ticker := time.NewTicker(time.Second * 1)
   defer  ticker.Stop()
   for {
      select {
      case <-ticker.C:
         fmt.Println("hello world")
      case <-idleDuration:
         fmt.Println("exit")
         return
      }
   }

   time.Sleep(time.Second * 5)
   fmt.Println("main func")
}

下面的这个例子,是经常遇到的一定要注意: 定时器定义位置

func main() {
    chi := make(chan int)
    go func() {
        for {
            // 定时器都是新创建的,那么就会造成永久性的泄露。
            timer := time.After(10 * time.Second)
            select {
            case <-ch:
                fmt.Println("get it")
            case <-timer:
                fmt.Println("end")
            }
        }
    }()

    for i:= 1; i< 1000000; i++ {
        chi <- i
        time.sleep(time.Millisecond)
    }
}
关于我
loading