Go Channel 关闭的那些事儿

前言

什么情况下关闭 channel 会造成 panic?有没有必要关闭 channel?如何判断 channel 是否关闭?如何优雅地关闭 channel?这些你都知道吗?(不要告诉我你只会回答最后一个问题!)看到这一溜烟的问题,不知道你会不会不禁感叹,究竟哪个天杀的总说 go channel “哲学”“优雅”的?也许 Rob Pike (go语言之父)会说:嗯,当然“哲学”“优雅”,只是需要你注意的问题有点多……

什么情况下关闭 channel 会造成 panic ?

先看示例:

 // 1.未初始化时关闭
 func TestCloseNilChan(t *testing.T) {
    var errCh chan error
    close(errCh)
    
    // Output:
    // panic: close of nil channel
 }
 
 // 2.重复关闭
 func TestRepeatClosingChan(t *testing.T) {
    errCh := make(chan error)
    var wg sync.WaitGroup
    wg.Add(1)
 
    go func() {
       defer wg.Done()
       close(errCh)
       close(errCh)
    }()
 
    wg.Wait()
    
    // Output:
    // panic: close of closed channel
 }
 
 // 3.关闭后发送
 func TestSendOnClosingChan(t *testing.T) {
    errCh := make(chan error)
    var wg sync.WaitGroup
    wg.Add(1)
 
    go func() {
       defer wg.Done()
       close(errCh)
       errCh <- errors.New("chan error")
    }()
 
    wg.Wait()
    
    // Output:
    // panic: send on closed channel
 }
 
 // 4.发送时关闭
 func TestCloseOnSendingToChan(t *testing.T) {
    errCh := make(chan error)
    var wg sync.WaitGroup
    wg.Add(1)
 
    go func() {
       defer wg.Done()
       defer close(errCh)
 
       go func() {
          errCh <- errors.New("chan error") // 由于 chan 没有缓冲队列,代码会一直在此处阻塞
       }()
 
       time.Sleep(time.Second) // 等待向 errCh 发送数据
    }()
 
    wg.Wait()

    // Output:
    // panic: send on closed channel
 }

综上,我们可以总结出如下知识点:

【知识点】在下述 4 种情况关闭 channel 会引发 panic:未初始化时关闭、重复关闭、关闭后发送、发送时关闭。

另外,从 golang 的报错中我们可以知道,golang 认为第3种和第4种情况属于一种情况。

通过观察上述代码,为避免在使用 channel 时遇到重复关闭、关闭后发送的问题,我想我们可以总结出以下两点规律:

  • 应该只在发送端关闭 channel。(防止关闭后继续发送)
  • 存在多个发送者时不要关闭发送者 channel,而是使用专门的 stop channel。(因为多个发送者都在发送,且不可能同时关闭多个发送者,否则会造成重复关闭。发送者和接收者多对一时,接收者关闭 stop channel;多对多时,由任意一方关闭 stop channel,双方监听 stop channel 终止后及时停止发送和接收)

这两点规律被称为“channel 关闭守则”。

既然关闭 channel 这么麻烦,那么我们有没有必要关闭 channel 呢?不关闭又如何?

有没有必要关闭 channel?不关闭又如何?

我们考虑以下两种情况:

情况一:channel 的发送和接收次数确定且相同时

 func TestIsCloseChannelNecessary_on_equal(t *testing.T) {
     fmt.Println("NumGoroutine:", runtime.NumGoroutine())
     ich := make(chan int)
 
     // sender
     go func() {
        for i := 0; i < 3; i++ {
           ich <- i
        }
     }()
 
     // receiver
     go func() {
        for i := 0; i < 3; i++ {
           fmt.Println(<-ich)
        }
     }()
 
     time.Sleep(time.Second)
     fmt.Println("NumGoroutine:", runtime.NumGoroutine())
    
     // Output:
     // NumGoroutine: 2
     // 0
     // 1
     // 2
     // NumGoroutine: 2
 }

channel 的发送和接收次数确定且相同时,发送者 go routine 和接收者 go routine 分别都会在发送或接收结束时结束各自的 go routine。而上述代码中的 ich 会由于没有代码使用被垃圾收集器回收。因此这种情况下,不关闭 channel,没有任何副作用。

情况二:channel 的发送次数不确定时

 func TestIsCloseChannelNecessary_on_less_sender(t *testing.T) {
    fmt.Println("NumGoroutine:", runtime.NumGoroutine())
    ich := make(chan int)
 
    // sender
    go func() {
       for i := 0; i < 2; i++ {
          ich <- i
       }
    }()
 
    // receiver
    go func() {
       for i := 0; i < 3; i++ {
          fmt.Println(<-ich)
       }
    }()
 
    time.Sleep(time.Second)
    fmt.Println("NumGoroutine:", runtime.NumGoroutine())
    
    // Output:
    // NumGoroutine: 2
    // 0
    // 1
    // NumGoroutine: 3
 }

以上述代码为例,channel 的发送次数小于接收次数时,接收者 go routine 由于等待发送者发送一直阻塞。因此接收者 go routine 一直未退出,ich 也由于一直被接收者使用无法被垃圾回收。未退出的 go routine 和未被回收的 channel 都造成了内存泄漏的问题。

因此,在发送者与接收者一对一的情况下,只要我们确保发送者或接收者不会阻塞,不关闭 channel 是可行的。在我们无法准确判断 channel 的发送次数和接收次数时,我们应该在合适的时机关闭 channel。那么如何判断 channel 是否关闭呢?

如何判断 channel 是否关闭?

【知识点】go channel 关闭后,读取该 channel 永远不会阻塞,且只会输出对应类型的零值。

如下代码所示:

 func TestReadFromClosedChan(t *testing.T) {
    var errCh = make(chan error)
 
    go func() {
       defer close(errCh)
       errCh <- errors.New("chan error")
    }()
 
    go func() {
       for i := 0; i < 3; i++ {
          fmt.Println(i, <-errCh)
       }
    }()
 
    time.Sleep(time.Second)
    
    // Output:
    // 0 chan error
    // 1 <nil>
    // 2 <nil>
 }

以上述代码为例,nil 可能也是需要 channel传输的值之一,通常我们无法通过判断是否为类型的零值确定 channel 是否关闭。所以为了避免输出无意义的值,我们需要一种合理的方式判断 channel 是否关闭。golang 官方为我们提供了两种方式。

解决方案一:使用 channel 的多重返回值(如 err, ok := <-errCh

 func TestReadFromClosedChan2(t *testing.T) {
     var errCh = make(chan error)
     go func() {
         defer close(errCh)
         errCh <- errors.New("chan error")
     }()
 
     go func() {
         for i := 0; i < 3; i++ {
             err, ok := <-errCh
             if ok {
                 fmt.Println(i, err)
             } else {
                 fmt.Println(i, err)
             }
 
         }
     }()
 
     time.Sleep(time.Second)
 
     // Output:
     // 0 chan error
     // 1 <nil>
     // 2 <nil>
 }

err, ok := <-errCh 的第二个返回值 ok 表示 errCh 是否已经关闭。如果已关闭,则返回 false。

解决方案二:使用 for range 简化语法

 func TestReadFromClosedChan(t *testing.T) {
    var errCh = make(chan error)
    go func() {
       defer close(errCh)
       errCh <- errors.New("chan error")
    }()
 
    go func() {
       i := 0
       for err := range errCh {
          fmt.Println(i, err)
          i++
       }
    }()
 
    time.Sleep(time.Second)
    
    // Output:
    // 0 chan error
 }

for range 语法会自动判断 channel 是否结束,如果结束则自动退出 for 循环。

如何优雅地关闭 channel ?

我们从前文也了解到,如果发生重复关闭、关闭后发送等问题,会造成 channel panic。那么如何优雅地关闭 channel,是我们关心的一个问题。

golang 官方为我们提供了一种方式,可以用来尽量避免这个问题。golang 允许我们在把 channel 作为参数时,使用 <- 控制 channel 发送方向,防止我们在错误的时候关闭 channel

 func TestOneSenderOneReceiver(t *testing.T) {
    ich := make(chan int)
    go sender(ich)
    go receiver(ich)
 }
 
 func sender(ich chan<- int) { // 注意参数中的箭头
    for i := 0; i < 100; i++ {
       ich <- i
    }
 }
 
 func receiver(ich <-chan int) {  // 注意参数中的箭头
    fmt.Println(<-ich)
    close(ich) // 此处代码会在编译期报错
 }

使用这种方法时,由于 close() 函数只能接受 chan<- T 类型的 channel,如果我们尝试在接收方关闭 channel,编译器会报错,所以我们可以在编译期提前发现错误。

除此之外,我们也可以使用如下的结构体:

 type Channel struct {
    C      chan interface{}
    closed bool
    mut    sync.Mutex
 }
 
 func NewChannel() *Channel {
    return NewChannelSize(0)
 }
 
 func NewChannelSize(size int) *Channel {
    return &Channel{
       C:      make(chan interface{}, size),
       closed: false,
       mut:    sync.Mutex{},
    }
 }
 
 func (c *Channel) Close() {
    c.mut.Lock()
    defer c.mut.Unlock()
    if !c.closed {
       close(c.C)
       c.closed = true
    }
 }
 
 func (c *Channel) IsClosed() bool {
    c.mut.Lock()
    defer c.mut.Unlock()
    return c.closed
 }
 
 func TestChannel(t *testing.T) {
    ch := NewChannel()
    println(ch.IsClosed())
    ch.Close()
    ch.Close()
    println(ch.IsClosed())
 }

该方案可以解决重复关闭锁的问题以及锁是否关闭的问题。通过 Channel.IsClosed() 判断是否关闭 channel ,又可以安全地发送和接收。当然我们也可以把 sync.Mutex 换成 sync.Once,来只让 channel 关闭一次。

有时候我们的代码已经使用了原生的 chan,或者我们不想使用单独的数据结构,也可以使用下述的几种方案。通常情况下,我们只会遇到四种需要关闭 channel 的情况

  • 一个发送者,一个接收者:发送者关闭 channel,接收者使用 selectfor range 判断 channel 是否关闭。
  • 一个发送者,多个接收者:发送者关闭 channel,同上。
  • 多个发送者,一个接收者:接收者接收完毕后,使用专用的 stop channel 关闭;发送者使用 select 监听 stop channel 是否关闭。
  • 多个发送者,多个接收者:任意一方使用专用的 stop channel 关闭;发送者、接收者都使用 select 监听 stop channel 是否关闭。

因此我们只需要熟记面对这四种情况时如何关闭 channel 即可。

小结

代码不会撒谎。事实证明,使用 go channel 要注意的问题确实不少。如果在关闭 channel 时处理不当,可能会导致 panic,甚至还会造成内存泄漏。

关于我
loading