背景

Go 编程中关于 channel 的使用非常重要,本文简要罗列一些要点,仅供参考。

有缓冲通道和无缓冲通道

本章节摘抄于《Go in Action》 一书(这本书的几张图当时解决了我入门时一些疑惑的地方)

无缓冲通道(unbuffered channel)

无缓冲通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。如下图所示:

  • 1:两个 goroutine 都到达通道,但哪个都没有开始执行发送或者接收;
  • 2:左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为,这时,这个 goroutine 会在通道中被锁住,直到交换完成;
  • 3:右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据,这个 goroutine 一样也会在通道中被锁住,直到交换完成;
  • 4 - 5:进行交换;
  • 6:释放 goroutine;

总结两句话:

  • 如果没有读者,写动作将被阻塞
  • 如果没有写者,读动作将被阻塞

有缓冲通道(buffered channel)

有缓冲通道(buffered channel)是一种在接收前能存储一个或者多个值的通道。这种类型的通道并不强求 goroutine 之间必须同时完成发送和接收。只有在通道中没有要接收的值,接收动作才会被阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会被阻塞。这导致有缓冲的通道和无缓冲的通道之间的一个很大的差异:无缓冲通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲通道没有这种保证。如下图所示:

  • 1:右侧 goroutine 正从通道接收一个值;
  • 2:右侧 goroutine 独立完成接收值的动作,而左侧的 goroutine 正在发送一个新值到通道里;
  • 3:左侧的 goroutine 还在向通道发送新值,而右侧的 goroutine 正从通道接收另一个值,这个步骤是异步的;
  • 4 :所有的发送和接收都完成,而通道里还有几个值,也有一些剩余空间;

总结两句话:

  • 仅当 channel 为空,读动作才被阻塞
  • 仅当 channel 满了,写动作才被阻塞

单向通道和双向通道

Go 的类型系统提供了 单向通道类型仅仅导出发送或接收操作,如:

  • chan<- int:只发不收;

  • <-chan int:只收不发,close 会在编译时报错;

违反这个原则会在编译时被检查出来。

我们普通定义的 channel 一般都是双向通道,在任何赋值操作中,将双向通道转换为单向通道都是允许的,但是反过来是不行的。一旦有一个像 chan<- int 的单向通道,是无法通过它获取到引用同一数据结构的 chan int 类型的。

Channel 使用要点

  1. select 中,如果有多个操作都被阻塞,则将选择 default 分支,比如:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    
    package main
    
    import "fmt"
    
    func main() {
        ch := make(chan int)
        select {
        // 如果发送被阻塞,将走 default 分支.
        case ch <- 1:
            fmt.Println("send")
        default:
            fmt.Println("default")
        }
    
        // 没有用 for,select 选择一个可以执行的分支执行完之后就退出.
        fmt.Println("Hello")
    }
    
  2. 如果有多个可用分支,select 将选择其他可用分支(如果有多个,将随机挑选一个),否则就走 default

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    package main
    
    import "fmt"
    
    func main() {
        ch1 := make(chan int)
        ch2 := make(chan int, 10)
    
        select {
        case ch1 <- 1: // 发送被阻塞,如果没有其他可用分支,将走 default.
            fmt.Println("send ch1")
        case ch2 <- 2: // 不会被阻塞,将走这条分支.
            fmt.Println("send ch2")
        default:
            fmt.Println("default")
        }
    
        // 没有用 for,select 选择一个可以执行的分支执行完之后就退出.
        fmt.Println("Hello")
    }
    
  3. 如果 select 没有可执行的分支,将一直保持阻塞,如下所示:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    package main
    
    func main() {
        ch := make(chan int)
    
        go func() {
            time.Sleep(10 * time.Second)
            ch <- 1
        }()
    
        // 没有可以执行的分支,将一直阻塞,直到 10 秒后 goroutine 中投递了一个消息.
        select {
        case <-ch:
            fmt.Println("receive")
        }
    }
    
  4. 如果多个 goroutine 都监听同一个 channel,那么 channel 上的数据都可能随机被某一个 goroutine 取走进行消费,而且这个过程应该是原子性的;

  5. 如果多个 goroutine 监听同一个 channel,如果这个 channel 被关闭,则所有 goroutine 都能收到退出信号(这也是 context 包的基础):

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    
    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    var wg sync.WaitGroup
    
    func worker(stop <-chan struct{}) {
        defer wg.Done()
        for {
            select {
            case <-stop:
                fmt.Println("received stop signal")
            return
                default:
            }
        }
    }
    
    func main() {
        stop := make(chan struct{})
        wg.Add(10)
        for i := 0; i < 10; i++ {
            // 每一个 goroutine 都监听同个 stop channel,将可同时收到 stop 信号.
            go worker(stop)
        }
    
        // 确保所有 goroutine 已经启动.
        time.Sleep(2 * time.Second)
        close(stop)
    
        wg.Wait()
    }
    
  6. 没有一种显式的方式来获知 channel 是否关闭,比如有一个 goroutine 需要一直从一个 channel 读取对应的值,直到 channel 关闭,可以采用:

    1
    2
    3
    4
    5
    6
    7
    8
    
    for {
        // ok 表示是否读正常,如果 ok 为 false,表示 channel 已被关闭.
        v, ok := <-ch
        if !ok {
            break
        }
        fmt.Println(v)
    }
    

    这部分 Go 也内置了更简洁的方式

    1
    2
    3
    4
    
    // 无需判断 ok,只要 channel 关闭,for 自动停止.
    for v := range ch {
        fmt.Println(v)
    }
    

    更完整的例子可参考:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    package main
    
    import (
        "fmt"
    )
    
    func main() {
        ch := make(chan int, 10)
    
        go func() {
            for i := 0; i < 20; i++ {
                ch <- i
            }
            close(ch)
        }()
    
        // 第一种方式:
        // for {
        //     v, ok := <-ch
        //	   if !ok {
        //         break
        //	   }
        //     fmt.Println(v)
        // }
    
        for v := range ch {
            fmt.Println(v)
        }
    
        // 此时 ok 为 false,v 为零值
        v, ok := <-ch
        fmt.Println(v, ok)
    }
    

参考资料