5.2.4 缓存channel
前面一直介绍的是无缓存通道,相对应地肯定也可以使用缓存channel。缓存通道有一个队列用于存储通道的数据,队列的最大长度是通过make关键字创建通道时指定的:
c := make(chan int 3)
上面语句创建了一个可以容纳三个int型数值的通道,可以通过图5-1来理解。
缓存通道的发送操作是在队列的尾部插入元素,而接收操作是从队列的头部移除一个元素。如果队列满了,发送数据的goroutine则会进入阻塞状态等待另一个goroutine来读取数据,进而腾出空间;而如果队列是空的,接收数据的goroutine则进入阻塞状态等待另一个goroutine在通道上发送数据。
下面通过图形来看一下缓存队列的变化。假如先向上述队列写入三个数值:
c<-1 c<-2 c<-3
这时队列里面放满了整型数字,如果没有其他goroutine读取,则会进入阻塞状态,如图5-2所示。
如果此时将队列内的一个整型数字打印出来:
fmt.Println(<-c)
这时会打印1,然后队列就会变成图5-3所示的样子。
此时的状态,既允许通道读又允许通道写,通过缓存区让通道的接收和发送两个操作可以同时进行。
如果要知道缓存区的容量,可以像对slice的操作一样使用cap函数;如果要知道缓存区元素的个数可以使用len函数。不过通道缓存区的容量是在定义的时候就知道的,但在正常情况下,当前元素的个数是不停变化的,所以一般不会在缓存区使用cap和len函数。
下面来写一个例子,模拟几个工人同时从一个任务处领取任务,所有任务完成后,工人下班。可以把任务定义为一个缓存通道,把工人定义为goroutine,下面来看代码:
book/ch05/5.2/buffer/main.go
1. package main
2.
3. import (
4. "fmt"
5. "sync"
6. )
7.
8. const(
9. noGoroutine = 5
10. noTask = 10
11. )
12.
13. var wg sync.WaitGroup
14.
15. func main() {
16. //创建缓存容量为noTask的缓存通道
17. tasks := make(chan int,noTask)
18.
19. //启动数量为noGoroutine的goroutine
20. for no := 1;no<=noGoroutine;no++{
21. wg.Add(1)
22. go taskProcess(tasks,no)
23. }
24.
25. //向tasks缓存通道内放入任务号
26. for taskNO:=1;taskNO<=noTask;taskNO++{
27. tasks<-taskNO
28. }
29. close(tasks)
30. wg.Wait()
31.
32. }
33.
34. func taskProcess(tasks chan int,workerNo int) {
35. defer wg.Done()
36.
37. for t := range tasks{
38. fmt.Printf("Worker %d is processing Task no:%d \n",workerNo,t)
39. }
40. fmt.Printf("Worker %d got off work \n",workerNo)
41. }
42.
43. //以下是程序某一次的执行结果(每次结果会不同)
44. Worker 1 is processing Task no:3
45. Worker 1 is processing Task no:6
46. Worker 1 is processing Task no:7
47. Worker 1 is processing Task no:8
48. Worker 1 is processing Task no:9
49. Worker 1 is processing Task no:10
50. Worker 1 got off work
51. Worker 3 is processing Task no:4
52. Worker 3 got off work
53. Worker 4 is processing Task no:5
54. Worker 4 got off work
55. Worker 5 is processing Task no:1
56. Worker 5 got off work
57. Worker 2 is processing Task no:2
58. Worker 2 got off work
第8行至第11行,定义了两个常量,一个是goroutine数量,等同于worker数量;一个是任务数。
第13行,定义变量wg,用于后续控制多个goroutine,待全部执行完成后再结束主goroutine。
第34行至第41行定义了函数taskProcess。该函数有两个参数,一个是int型通道,一个是int类型的工人编号。实现的功能就是打印工人编号和任务编号,在读取完tasks通道后,打印工人下班信息,然后调用Done方法让wg减1。注意for循环结束的条件是tasks关闭,关闭操作在main方法中。
第16行和第17行,定义一个缓存队列长度为任务数的int型通道。
第19行至第23行,按照工人数量启动goroutine,每个goroutine里面都运行taskProcess函数,传入的工人编号是1至5。注意此时的任务通道tasks还是空的,所以goroutine在此刻还是处于阻塞等待状态。
第26行至第28行,往tasks通道中放入任务编号,int型。此时的5个goroutine会开始读取tasks的任务编号。
第29行,关闭通道。注意,不管goroutine有没有执行完成,先关闭通道,因为即便是通道关闭,如果缓存区还有数据的话,goroutine还是可以读取的。
第30行,等所有的goroutine执行完成后,继续执行主goroutine结束程序。
请思考,如果第29行和第30行换一下顺序,会发生什么?答案是会出现死锁,goroutine阻塞等待缓存区继续写入数据,而主goroutine又等待所有goroutine执行完再关闭通道,因此进入了死锁状态。
第44行至第58行是一次执行的结果,每次执行的结果应该都会不同,可以看到每个worker的任务量是不同的,因为每次都是并发执行,结果具有不确定性。
注意
通道关闭后仍然可以接收通道的数据,直到通道为空,继续接收则会让对应的操作结束。而向关闭后的通道发送数据则会导致异常。