Go channel

basics

  • channel 是用于不同 goroutine 之间通信的数据结构

  • 同步不同的goroutine

定义不同类型的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var ch chan int  // ch is nil

var sendCh chan<- int // 定义发送通道类型
var recvCh <-chan int // 定义通道为接受通道类型

ch := make(chan int) // unbuffered channel

ch := make(chan int, 1) // buffered channel

ch := make(chan int64, 1)

ch := make(chan float64, 1)

type mstruct struct {}
ch := make(chan *mstruct, 10)

从channel中接受/发送值

1
2
3
4
ch := make(chan int, 1)
ch <- 100 // send

v, ok <- ch // receive, 如果ch被关闭,那么ok为false

goroutine 之间同步

使用无容量的channel在goroutine之间同步
对无容量的channel来说,发送者会阻塞直到接收者从这个channel接受数据
同时接收者会一直阻塞直到发送者往这个channel发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan int64) // ch unbuffered channel
go sum([]int64{1, 3, 4, 5, 6, 10}, ch)

fmt.Printf("sum = %d\n", <-ch)
}

func sum(numbers []int64, sum chan int64) {
total := int64(0)
for _, n := range numbers {
total += n
time.Sleep(time.Millisecond)
}

sum <- total
}

对于有容量的channel:发送者会阻塞到一个容量是满的channel上,同时接受者会阻塞到一个空的channel上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func bufferChanSum() {
ll := [][]int32{
{1,2,3,4},
{5,6,7,8},
{9,10,11},
{100,1000,999,8888},
}

results := make(chan int32, len(ll))
for _, l := range ll {
go func(intList []int32) {
var sum int32
for _, v := range intList {
sum+= v
}
results <- sum
}(l)
}

for i := 1; i <= len(ll); i++ {
fmt.Println(<-results)
}
close(results)
}

close

  • 发送者可以关闭channel

  • 关闭已关闭的channel会panic, 关闭 nil channel会panic

  • 关闭buffered channel后,如果channel里还有值未被接受,那仍然可以从这个channel里接受值,如果没有值可接受,接受操作结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
for v := range recvBufferedChan() {
fmt.Printf("%d, ", v)
}

// 10, 20, 30, 40, 50,
}

func recvBufferedChan() chan int {
vs := make(chan int, 5)
for i := 1; i <= 5; i++ {
vs <- i*10
}
close(vs)
return vs
}
  • 往已关闭的channel发送值,会panic
1
2
3
4
5
func sendToClosedChan() {
c := make(chan int)
close(c)
c <- 100 // // panic: send on closed channel
}
  • 从已关闭的channel接受值,会收到channel类型的零值。
1
2
3
4
5
6
func receiveFromClosedChan() {
c := make(chan int)
close(c)
v ,ok := <-c
fmt.Println(v, ok) // 0, false, false 表示channel已关闭
}

select

select 用来处理多个goroutine的发送和接受操作。
当有多个goroutine准备好发送或者接受,那么select会随机选择一个进行操作;如果都没有准备好,那么会选择default中的代码块执行,如果没有default分支。select就会阻塞。
select 不会处理nil channel, 当一个接受操作的channel被关闭之后把它设置为nil, 那么下一次这个case就不会被选择。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// https://www.godesignpatterns.com/2014/05/nil-channels-always-block.html
func merge2Chan(a, b <-chan int) <-chan int {
c := make(chan int)

go func() {
Loop:
for a != nil || b != nil { // nil channel 不会被select 选择
select {
case v, ok := <-a:
if ok {
c <- v
} else {
log.Println("a is done")
a = nil
continue Loop
}

case v, ok := <-b:
if ok {
c <- v
} else {
log.Println("b is done")
b = nil
continue Loop
}
}
}

log.Println("closing c")
close(c)
}()

return c
}

上面的程序是合并两个channel, 当其中一个被close之后,ok 为false, 然后将这个channel的值设为nil。当两个channel都为nil的时候goroutine结束,完成合并。

channel 超时检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func simulateWaitGroup() {
rand.Seed(time.Now().Unix())
runtime.GOMAXPROCS(8)
rs := make(chan int, 9)

for i := 1; i <= 9; i++ {
go func(i int) {
ch := make(chan int)
var d time.Duration
if i%2 == 0 {
d = 3 *time.Second
} else {
d = time.Second
}
go timeConsumingFunc(ch, d)
select {
case v := <-ch:
rs <- v
case <-time.After(2 * time.Second):
fmt.Println(i, "timeout")
rs <- -1
}
}(i)
}

for j := 1; j <= 9; j++ {
fmt.Println(<-rs)
}

close(rs)
}

func timeConsumingFunc(ch chan int, d time.Duration) {
time.Sleep(d)
ch <- rand.Int()
}
/*
1829528195022233954
2293972887250620127
1139169814077204315
8896168675121178474
9143794745844314468
8 timeout
6 timeout
-1
4 timeout
-1
-1
2 timeout
-1
*/

在调用timeConsumingFunc之前,设置i为偶数的时候timeConsumingFunc处理时间设置为3秒,也就是偶数的index会处理为超时,当timeConsumingFunc耗时超过2秒,那个下面select中time.After会被选择,处理超时的逻辑。

参考: