Golang 使用心得-正确关闭channel

相信很多同学都是因为Golang有相当简单的并发模型才转过来的。Golang的并发模型主要由三部分构成:Goroutine、Channel和Select。

其中Channel是Goroutine之间通信的桥梁,简单优雅的解决了并行开发中的同步问题。

但是初学者(比如我)通常会遇到 send on closed channel的错误,这是因为我们向一个已经关闭channle推送数据造成的。通常这个错误是发生在生成消费模型中。channel用来传送需要执行的任务,channel一端是生产者,另一端是消费者。

如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
    "fmt"
    "sync"
)

func main() {
    jobs := make(chan int)
    var wg sync.WaitGroup
    wg.Add(10)
    go func() {
        for i := 0; i < 10; i++ {
            jobs <- i
            fmt.Println("produce:", i)
        }  
    }()
    go func() {
        for i := range jobs {
            fmt.Println("consume:", i)
            wg.Done()
        }  
    }()
    wg.Wait()
}

注意上面的代码我们没有主动关闭channel,因为我一开始就知道执行的任务数,所以直接等待任务完成即可,没有必要主动关闭channel。

假设我们的场景中,生产者没有生成数量的限制,只有一个时间限制。那么代码演变成下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    jobs := make(chan int)
    var wg sync.WaitGroup
    go func() {
        time.Sleep(time.Second * 3)
        close(jobs)
    }()
    go func() {
        for i := 0; ; i++ {
            jobs <- i
            fmt.Println("produce:", i)
        }  
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := range jobs {
            fmt.Println("consume:", i)
        }  
    }()
    wg.Wait()
}

这时程序就会因为send on closed channel而崩溃。那么首先想到的解决方案可能是在放入jobs的时候检查jobs是否已关闭。幸好Golang有办法来检测channel是否被关闭,但是非常不好用。Golang中可以用comma ok的方式检测channel是否关闭,如下:

i, ok := <- jobs 如果channel关闭那么ok返回的是false,但是这样的话,如果jobs没有关闭,那么就会漏掉一个job。当然你可以想办好hack掉漏掉的这个job。但是这样的代码不是很优雅,也是那么直观合理。 在实践中我是用的如下代码处理这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    jobs := make(chan int)
    var wg sync.WaitGroup
    timeout := make(chan bool)
    go func() {
        time.Sleep(time.Second * 3)
        timeout <- true
    }()
    go func() {
        for i := 0; ; i++ {
            select {
            case <-timeout:
                close(jobs)
                return
            default:
                jobs <- i
                fmt.Println("produce:", i)

            }  
        }  
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := range jobs {
            fmt.Println("consume:", i)
        }  
    }()
    wg.Wait()
}

实际情况可能还要复杂很多,比如会有多个生产者,这个时候代码又该如何处理呢?大家可以想一想。。。