Go 语言基础系列(三)协程 Goroutine

前言

本文为笔者所著 Go 语言基础系列之一,本文为作者原创作品,转载请注明出处;

特性

协程是 Golang 在用户层所创建的虚拟线程,拥有自己的堆和栈;数千个协程可以共享同一个内核线程,协程之间的调度由 Go Runtime 来控制,内核线程无感知;协程之间通过管道 Channel 来通讯,避免对内存资源相互竞争(race condition);

Main Goroutine

程序执行的时候,Go 会为 Main 函数单独创建一个 Goroutine,称作 Main Goroutine 主协程,也称作 Controller;

Goroutine

  1. 通过关键字 go 调用一个方法即刻启动一个协程;
  2. 当 Main Goroutine 执行完毕以后,其他的 Coroutines 也立刻结束,无论这些 Coroutines 正在执行与否;
  3. 协程的返回值将会被 Controller 所忽略,也就是它的返回值在主协程中是无效的;其实这一点很显然,因为协程相对于主协程而言是异步执行的;

来看下面这个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func running() {
var times int
// 构建一个无限循环
for {
times++
fmt.Println("tick", times)
// 延时1秒
time.Sleep(time.Second)
}
}
func main() {
// 并发执行程序
go running()
// 接受命令行输入, 不做任何事情
var input string
fmt.Scanln(&input)
}

在 main 方法启动的时候,Golang 会为 main 方法单独创建一个 Main Goroutine,然后,我们通过关键字go启动了一个 Goroutine;为了避免主协程直接退出,代码 22 行需要等待用户的任意输入,主协程才会结束;

Channel

协程之间是通过管道 Channel 进行通讯的;通常情况下,为了避免多个线程对同一内存资源竞争(race condition),通常,我们是通过对该公共资源上读写锁来实现的,原理虽然简单,但是在代码实现过程中往往变得晦涩难懂;因此,有人提出使用消息传递机制来避免使用大量的读写锁来避免竞争,这种机制的原理其实很简单,就是得到消息的线程才有权访问(修改)此资源,当访问完毕,再将状态通过消息发送给另外一个线程,这样,该线程又获得了对该资源的访问权限,而它在代码中的逻辑也非常简单,清晰易懂;消息传递机制英文称作 Communicating Sequential Processes,由 C. A. R. Hoare 很早以期就提出了;

创建

  1. 声明
    通过关键字 chan T 声明一个管道,T 表示类型;
  2. 创建
    两种方式可以创建一个管道,

    • 一种方式,先声明后创建,

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      package main

      import "fmt"

      func main() {
      var a chan int
      if a == nil {
      fmt.Println("channel a is nil, going to define it")
      a = make(chan int)
      fmt.Printf("Type of a is %T", a)
      }
      }

      代码第 6 行首先声明了一个int类型的管道a,这个时候 a 的值为nil,表示未初始化;

    • 另一种方式就是直接创建,

      1
      a := make(chan int)

      声明和初始化一并完成;

收发消息

语法

1
2
data := <- a // read from channel a  
a <- data // write to channel a

a 表示一个已经初始化好的管道,

  • data := <- a表示从管道a读取,将读取到的值存放到变量data中;
  • a <- data表示将变量data中的值写入管道a中去,也就是通过管道a将消息发送出去;

阻塞

设某协程中有一管道 $C$,

  • 当该协程使用 $C$ 发送数据后,该协程将会一直被阻塞,直到接收方读取到数据;
  • 当该协程使用 $C$ 开始接收消息后,该协程将会一直被阻塞直到发送方写入数据;

来看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
)

func hello() {
fmt.Println("Hello world goroutine")
}

func main() {
go hello()
fmt.Println("main function")
}

代码第 13 行开启了一个协程,但是,这段代码并没有像预期那样,打印出 Hello world goroutine ,而是直接输出了 main function;其实这个结果是显而易见的,因为主协程(Main Goroutine)不会等待 hello 协程,它会直接退出,因此导致 hello 协程没有执行完,整个程序便结束了;这里,我们可以利用管道阻塞的特性,达到我们预期的目的,让主协程等待 hello 协程执行完后才退出,代码如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
)

func hello(done chan bool) {
fmt.Println("Hello world goroutine")
done <- true
}

func main() {
done := make(chan bool)
go hello(done)
<-done
fmt.Println("main function")
}

代码第 15 行,主协程开始接收管道 done 的消息,它会一直阻塞直到发送方发送数据,也就是等待 hello 协程往管道 done 写入数据,而这个步骤正好就是 hello 协程的最后一步,因此,再次执行,我们将会得到我们所预期的结果,

1
2
Hello world goroutine
main function

死锁

例子

看官文的描述,

A deadlock happens when a group of goroutines are waiting for each other and none of them is able to proceed.

当所有的协程都在相互等待的时候,且没有一个协程可以继续执行的时候,死锁便产生了;看一个例子,

1
2
3
4
5
6
7
package main


func main() {
ch := make(chan int)
ch <- 5
}

Go runtime 提示出错,

1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/tmp/sandbox249677995/main.go:6 +0x80

fatal error: all goroutines are asleep - deadlock!;代码很简单,也很明显,这是一个死锁,因为没有任何一个 Goroutine 能够消费;

Go runtime 是如何检测到死锁的

要注意的是,这是一个 runtime error,也就是说,在程序执行期间,由 Go runtime 检测出来的;笔者好奇的是,Go 是如何检测到死锁的?看官文的描述,

Currently Go only detects when the program as a whole freezes, not when a subset of goroutines get stuck.

也很简单,Go 只是去检查整个程序(所有的 Goroutines)是否都已经停止了,如果是,那么就是发生了灾难,Deadlock;很显然,上面这个例子只有一个 Main Goroutine,而它将会永远被阻塞,因此,Go runtime 检测到,整个程序已经被阻塞停止了,因此抛出 Panic Deadlock 异常!

谁优谁劣

With channels it’s often easy to figure out what caused a deadlock. Programs that make heavy use of mutexes can, on the other hand, be notoriously difficult to debug.

上面这段官文的意思就是说,由管道所引起的死锁,通常都非常容易排查;但是如果是因为锁引起的,那么,排查问题的原因将会是非常艰难的!所以,即便管道也会产生死锁,但相比之下,它明显比用锁来避免 race condition 更为优秀;在后面笔者会介绍如何用 Channel 来实现互斥!

Reference

https://yourbasic.org/golang/detect-deadlock/

单向管道

故名思议,单向管道就是指该管道要么只能,要么只能,消息;但是,很明显,谁会往一个不能读取的管道中去写入数据呢?写入的数据谁来接收呢?这不是矛盾的且不合理的吗?其实 Golang 所设计的单向管道的目的在于,在某些特殊的方法中,我们为了限制管道的用法,将一个双向管道强制转换为一个特定的单向管道,让它在该方法中要么只能读,要么只能写,仅此而已;也就是说通常,单向管道是由双向管道强制转换而来的,当然,你也可以不从双向管道转换而来,而是直接创建一个单向管道,如下,

  • 语法 chan<- T 创建一个只能发送的管道,

    1
    sendch := make(chan<- int)

    这里,我们直接创建了一个只能发送的管道;

  • 语法 <-chan T 创建一个只能接收/读取的管道

来看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func sendData(sendch chan<- int) {
sendch <- 10
}

func main() {
sendch := make(chan<- int)
go sendData(sendch)
fmt.Println(<-sendch)
}

代码第 10 行,创建了一个只允许写入的管道 sendch;运行,报错 main.go:12: invalid operation: <-sendch (receive from send-only type chan<- int),代码第 12 行报错,很显然,这里不能对一个只能写入的管道进行读取,因此报错;其实,正确的用法如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func sendData(sendch chan<- int) {
sendch <- 10
}

func main() {
chnl := make(chan int)
go sendData(chnl)
fmt.Println(<-chnl)
}

代码第 10 行,这次,一开始,我们创建的是一个双向管道 chnl,只是,在主协程调用方法 sendData 中将入参 chnl 强制转换成了一个只写管道 sendch;执行,输入结果 10;这才是 Golang 定义单向管道的真正用意所在;

关闭

  • 使用内置方法 close(chan) 来关闭一个管道;
  • 我们依然可以读取一个被关闭的管道,只是这个时候,我们读取到的永远是0
  • 使用语句

    1
    v, ok := <- ch

    通过返回的状态值 ok 来判断一个管道是否已经关闭,若 ok==true,则表示该管道 ch 已经关闭;

来看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
)

func producer(chnl chan int) {
for i := 0; i < 10; i++ {
chnl <- i
}
close(chnl)
}

func main() {
ch := make(chan int)
go producer(ch)
for {
v, ok := <-ch
if ok == false {
break
}
fmt.Println("Received ", v, ok)
}
}

协程 producer 通过循环将数字写入 chnl 管道,注意,当写完 10 个数字以后,代码第 11 行,将管道关闭了;这个时候,代码第 18 行,主协程通过状态值 ok 来判断管道是否已经关闭,如果已经关闭,则跳出该无限 for 循环;

不过上述的例子中,我还有一个疑惑,那就是如果 producer 要比 consumer 快,consumer 消费要慢许多,那么势必导致,在 consumer 还没有完全消费完队列中的元素之前,producer 就已经调用了 close 方法,关闭了管道,那么,这个时候,consumer 是否还能够继续正常消费队列中所剩余的元素呢?答案已经在 close 方法源码的注解里了,如下,

// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
// x, ok := <-c
// will also set ok to false for a closed channel.

has the effect of shutting down the channel after the last sent value is received,close 方法会在最后一个已发送的元素被消费后,才会关闭队列,因此,这个顾虑是不存在的!

循环读取

上一个小节所介绍的循环读取某个管道中数据的例子,循环语句稍显复杂;Golang 提供了一组关键字 for range 来简化该操作;例子如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
)

func producer(chnl chan int) {
for i := 0; i < 10; i++ {
chnl <- i
}
close(chnl)
}

func main() {
ch := make(chan int)
go producer(ch)
for v := range ch {
fmt.Println("Received ",v)
}
}

我们将上一小节无限 for 循环的判断语句改写成了一行代码 for v := range ch,它会自动检测管道 ch 是否已经关闭,若已经关闭,将会自动退出该 for 循环;

Buffered Channel

创建

1
ch := make(chan type, capacity)
  • 参数 capacity 表示需要创建多大容量的缓存管道;

带有缓存的管道可以理解为一个 FIFO 的队列;并且,当有多个协程同时消费一个带缓存的管道的时候,它是线程安全的,

阻塞

我们知道,普通的没有缓存的 Channel,每发送一条数据后,相关的协程立刻便会被阻塞;而有缓存的管道(Buffered Channel)的性质是,假设我们有一个带缓存的管道 $BC$

  • 某个协程往 $BC$ 中不断的写入数据的时候,当 $BC$ 被写满后,该协程将会被阻塞,直到 $BC$ 中有新的空位被腾出;
  • 某个协程从 $BC$ 中不断的读取数据的时候,当 $BC$ 被取空后,该协程将会被阻塞,直到 $BC$ 中有新的数据被写入;

来看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func produce(ch chan string) {
ch <- "naveen"
fmt.Println("produced", "naveen")
ch <- "paul"
fmt.Println("produced", "paul")
ch <- "kane"
fmt.Println("produced", "kane")
close(ch)
}

func main() {
ch := make(chan string, 2) // capacity starts from 0,
go produce(ch)
// consumer
time.Sleep(2000000 * time.Second) // the consumer works very very slow!
for v := range ch {
fmt.Println("consumed", v)
}
fmt.Println("Done!")
}

上面这个例子,produce 协程将会在写入人名 kane 第地方阻塞,也就是阻塞在代码的第 13 行,运行代码,输出结果,

1
2
produced naveen
produced paul

很明显,因为 consume 协程迟迟不消费,导致 produce 协程已经把管道 ch 写满了,导致 produce 协程阻塞了;

死锁

由上一个小节死锁中,我们知道,Go runtime 在执行过程中,若发现所有的 Goroutines 都相互阻塞了,那么就会抛出运行时刻的 fatal errors,Dead lock;下面我们来看一个由 Buffered Channel 所导致的死锁的例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
)

func main() {
ch := make(chan string, 2)
ch <- "naveen"
fmt.Println("produced", "naveen")
ch <- "paul"
fmt.Println("produced", "paul")
ch <- "steve"
fmt.Println("produced", "steve")
fmt.Println(<-ch)
fmt.Println(<-ch)
}

运行,代码在执行到第 13 的时候,抛出 Dead lock 的运行时异常;Buffered Channel 的容量为 2,当写入 paul 以后,该 channel 便被阻塞了,且当前只有一个 main goroutine,所以,Go runtime 检测到整个系统已经停止了,便立即抛出了 Dead lock 异常;

Length vs Capacity

  • Length
    表示 channel 中当前可用的元素的长度;
  • Capacity
    表示 channel 总共的容量;

WaitGroup

这个关键字 WaitGroup 的名字取得非常的形象,直译过来表示“等待一组东西”;WaitGroup 的目的在于让当前的 goroutine $A$ 等待,直到它所监控的其它 goroutines 都执行完成以后,才唤醒 $A$ 让它继续执行;可以形象的想象为,有个水闸,只有等待所有的水都满了以后,才会开闸放水;笔者联想到了 Java 线程中的 Latch,门栓,只有等待所有被监控的 Java 线程执行完以后,这个门栓才会被打开;来看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"sync"
"time"
)

func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}

func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
  1. 代码第 23 行,对主协程上锁,让它等待,这步很重要,避免主协程直接退出;
  2. 代码第 20 行,通过 wg.Add(1)WaitGroup 计数 $+1$,表示该线程目前已被 WaitGroup 所监控;
  3. 代码第第 13 行,通过 wg.Done()WaitGroup 计数 $-1$,告诉它,某个被监控的协程已经执行完毕;
  4. 直到所有被 WaitGroup 监控的协程都执行完毕后,wg.Wait() 开始放行,允许主协程继续执行;其实也就是当 WaitGroup 实例 wg 的计数等于 0 以后,放行;

WaitGroup 的实现原理非常简单,它维护一个计数器,当某个或多个协程需要被监控的时候,通过 WaitGroup.Add(n) 方法将计数器 $+n$,表示当前有 $n$ 个协程已经在它的监控中了;当某个被监控的协程执行完毕后,通过 WaitGroup.Done() 使得计数器 $-1$,并且直到 WaitGroup 中的计数器为 0 的时候,则立刻放行被 WaitGroup 所阻塞的 Goroutine;

Worker Pools

Worker Pools、Goroutine Pools、Thread Pools 在 Golang 中都表示同一个意思,实际上就是协程池;

Go 协程池

通常,完整的线程池通常由两部分组成,消息队列线程池;Golang 中,自然,线程池协程池构建,消息队列自然就是上面所提到的管道 Channel消息队列用来提供需要被处理的源数据,协程池来处理这些数据,要注意的是,协程池中的多个协程将会同时并发访问该消息队列也就是管道 Channel,因此,Golang 在设计上,管道必须是线程安全的;

下面,来看一个非常简单的 Worker Pools 的例子,这个例子中,worker pools 中的 workers 将会并发消费消息队列 jobs 中的任务;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}

func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}

// 创建 workers
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}

// 生成存放 jobs 队列的管道,
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}

func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}

func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

  • 代码第 19-20 行,初始化了管道 jobs 和用来存放计算结果的管道 results;这里的 jobs 管道充当消息队列由 Worker Pool 中的 Workers 并发访问;
  • 代码第 73 行,逐步生成有 100 个 jobs 消息的管道既生成了消息队列;
  • 代码第 77 行,创建 Workers,构成 Worker Pool;Workers 由协程构成,因此这里最准确的表达是协程池;要注意的是,协程池中的多个协程将会同时并发访问同一个 jobs 管道;

上面的这个例子只是非常简单的模拟了一个协程池的大致模样;真正的协程池,输入数据应该来自于网络,典型的高并发场景就是,数据来自于 socket,然后采用 I/O 多路复用的方式,将数据缓存在内存中,然后通过回调的方式直接将数据直接提供给协程池进行并发处理;另外,一般而言协程池中的协程是不应该在某个任务执行完以后就直接被销毁的,通常而言,这些协程是一直存在的,协程中所处理的任务通过接口解耦合从而实现可执行任务的多样性;

Select

Select 同时监听多个管道,然后,根据不同的策略,只会选取接收其中一个管道的结果;

优先响应

同时监听两个管道,哪个管道先返回,就获取谁的数据,另外一个管道的数据就丢弃;
假设,我们有两个同步数据源,分布在不同的地域,同时读取,自然,为了及时响应,谁先返回就取谁的数据;下面,用 server1server2 模拟了这种情况,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"time"
)

func server1(ch chan string) {
time.Sleep(6 * time.Second)
ch <- "from server1"
}

func server2(ch chan string) {
time.Sleep(3 * time.Second)
ch <- "from server2"

}

func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
select {
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}

程序输出 from server2,因为 server2 优先响应;

Default case

当读取的时候,如果其他的 case 都还没有数据响应,便会立刻执行 default case,看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func process(ch chan string) {
time.Sleep(10500 * time.Millisecond)
ch <- "process successful"
}

func main() {
ch := make(chan string)
go process(ch)
for {
time.Sleep(1000 * time.Millisecond)
select {
case v := <-ch:
fmt.Println("received value: ", v)
return
default:
fmt.Println("no value received")
}
}

}

执行结果如下,

1
2
3
4
5
6
7
8
9
10
11
no value received  
no value received
no value received
no value received
no value received
no value received
no value received
no value received
no value received
no value received
received value: process successful

无限 for 循环将会每个 10 秒读取一次管道 ch,看看是否有返回值,而生产者 process 要 10.5 秒之后才能产生数,因此,头 10 次读取,process case 并没有准备好,打印 default case 的结果,no value received,知道 10 次以后,才输出 process case 的返回结果;

可用来处理业务数据响应超时的情况

Default case 还经常用在管道数据响应超时的情况,想想,如果通过管道返回一个数据库查询的结果,但是管道 10 分钟了都迟迟没有返回结果?那么可以使用 Default case 来处理这种响应超时的业务异常;

随机选择

当几个管道同时返回结果的时候,Select 将会随机选择一个返回值;例如,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
)

func server1(ch chan string) {
ch <- "from server1"
}

func server2(ch chan string) {
ch <- "from server2"

}

func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
time.Sleep(1 * time.Second)
select {
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}

管道 output1output2 同时返回数据,这个时候,Select 将会随机的执行 case s1 或者 case s2;多执行上述代码几次,将会分别随机的打印出 from server1 和 from server2;

Empty select

1
2
3
4
5
package main

func main() {
select {}
}

一个空的 select 表示没有任何的 case,这个时候,主协程将会一直被阻塞,知道 runtime error, deadlock!

1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:
main.main()
/tmp/sandbox299546399/main.go:4 +0x20

Mutex

Mutex 在多线程的竞态条件(Race Condition)下,给共享的公共资源加锁,保证数据的一致性;

发生竞态条件

下面来看一个竞态条件下,数据一致性被破坏的例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main  
import (
"fmt"
"sync"
)

var x = 0

func increment(wg *sync.WaitGroup) {
x = x + 1
wg.Done()
}

func main() {
var w sync.WaitGroup
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w)
}
w.Wait()
fmt.Println("final value of x", x)
}

我们期望最终结果输出为 1000,但是,每次执行,得到的结果不同且都小于 1000;因为公共资源 $x$ 被多个协程同时并发访问,导致 $x$ 的数据一致性被破坏 - 这就是我们常说的某个公共资源存在竞态条件既 race codition;

通过 Mutex 解决竞态条件

还是以上面的这个例子为例,我们如何解决对于公共资源 $x$ 的竞态条件呢?核心步骤很简单,既是在对公共资源 $x$ 进行写操作的时候,通过 Mutex 上锁即可,

1
2
3
mutex.Lock()  
x = x + 1
mutex.Unlock()

mutex.Lock()表示当前协程对接下来的程序片段的入口上锁,当其它协程同时执行到此处,便会被阻塞,强制等待,直到当前协程释放锁,才能允许访问,注意,这个时候,同样只有一个协程能够获得锁的权限,其它协程继续等待;这样,就能保证互斥的修改存在竞态条件的公共资源 $x$ 了;下面来看完整的例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main  
import (
"fmt"
"sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}

func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m)
}
w.Wait()
fmt.Println("final value of x", x)
}

在写公共资源 $x$ 的时候通过 Mutex 上锁,这样便保证了 $x$ 的数据一致性;

通过 Channel 解决竞态条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main  
import (
"fmt"
"sync"
)

var x = 0

func increment(wg *sync.WaitGroup, ch chan bool) {
ch <- true
x = x + 1
<- ch
wg.Done()
}

func main() {
var w sync.WaitGroup
ch := make(chan bool, 1)
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, ch)
}
w.Wait()
fmt.Println("final value of x", x)
}

上面代码的核心思想是,通过创建一个容量为 1 的带缓存管道,为具有竞态条件的资源 $x$ 上锁;代码第 10 行,当某一个协程 $\alpha$ 为管道ch赋值以后,ch 便无法再被赋值,因此其它协程在此处被阻塞,直到代码第 12 行协程 $\alpha$ 将 ch 的值取出为止,这个时候,ch 允许写入,众多等待的协程将会有一个幸运儿获得对 $x$ 的锁;这样,我们便通过管道实现了对 $x$ 的互斥访问;

谁优谁劣

通常而言,如果只有一类协程需要并发访问互斥资源 $x$,那么用 Mutex 简单高效,就像上面这个例子那样,只有 increment 协程需要并发访问互斥资源 $x$,因此使用 Mutex 更为合适;但是如果多个不同的协程之间,不但需要互斥访问资源 $x$,而且还需要相互通讯的时候,这种情况下使用管道来实现互斥,代码逻辑更为简单,来看下面这个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main
import (
"fmt"
"sync"
)

var x = 0

func increment(mutexch chan bool, data chan int) {
// lock
mutexch <- true
x = x + 1
data <- x
}

func decrement(wg *sync.WaitGroup, mutexch chan bool, data chan int) {
tmp := <- data
x = tmp - 2
// unlock
<- mutexch
wg.Done()
}

func main() {
var w sync.WaitGroup
ch := make(chan bool, 1)
data := make(chan int, 1)
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(ch, data)
go decrement(&w, ch, data)
}
w.Wait()
fmt.Println("final value of x", x)
}

上面这段代码先通过 increment 协程对 $x$ 加 1,然后通过管道 data 将中间结果传递给 decrement 协程,然后 decrement 对 $x$ 减 2,因为 $x$ 是存在竞态条件的,因此要求整个对互斥资源 $x$ 的访问过程都是互斥的,因此,需要同时对 increment 和 decrement 协程中访问 $x$ 的地方上锁;更准确的说,是在 increment 协程中对 $x$ 进行上锁,在 decrement 协程中对 $x$ 释放锁;上面这段代码执行后,我们得到了期望值 -1000,可见,在上述由两个协程所构成的多线程的环境下,对互斥资源 $x$ 的访问过程中是线程安全的,不过,其实这里的两个管道 mutexch 和 data 都起到了对 $x$ 的互斥保护的作用,mutexch 锁的粒度更大,它的锁粒度是横跨了两个协程,主要限定了对 $x$ 资源访问的开始和结束的边界,而 data 管道主要保证 decrement 协程对 $x$ 的互斥访问,它会阻塞 decrement 协程直到 increment 将中间结果传递过来;

综上,Mutex 适合简单的情况,也就是只有一种协程的情况;而管道非常适合复杂情况下的互斥访问,尤其某个互斥资源需要在多个协程之间进行先后协作访问的时候,想想,如果上面的这个例子通过 Mutex 的方式来实现,得需要多少个锁,多少种情况需要考虑?即便是实现出来了,代码也必然也是晦涩难懂不易于维护的!如果可以,笔者个人认为,尽量使用管道实现互斥更佳!