sync标准库包中提供的并发同步技术

通道用例大全一文中介绍了很多通过使用通道来实现并发同步的用例。 事实上,通道并不是Go支持的唯一的一种并发同步技术。而且对于一些特定的情形,通道并不是最有效和可读性最高的的同步技术。 本文下面将介绍sync标准库包中提供的各种并发同步技术。相对于通道,这些技术对于某些情形更加适用。

sync标准库包提供了一些用于实现并发同步的类型。这些类型适用于各种不同的内存顺序需求。 对于这些特定的需求,这些类型使用起来比通道效率更高,代码实现更简洁。

(请注意:为了避免各种异常行为,最好不要复制sync标准库包中提供的类型的值。)

sync.WaitGroup(等待组)类型

每个sync.WaitGroup值在内部维护着一个计数,此计数的初始默认值为零。

*sync.WaitGroup类型有三个方法Add(delta int)Done()Wait()

对于一个可寻址的sync.WaitGroupwg

请注意wg.Add(delta)wg.Done()wg.Wait()分别是(&wg).Add(delta)(&wg).Done()(&wg).Wait()的简写形式。

一般,一个sync.WaitGroup值用来让某个协程等待其它若干协程都先完成它们各自的任务。 一个例子:
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wg sync.WaitGroup
	wg.Add(N)
	for i := 0; i < N; i++ {
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			fmt.Println("Done:", i)
			wg.Done() // <=> wg.Add(-1)
		}()
	}

	wg.Wait()
	// 所有的元素都保证被初始化了。
	fmt.Println("values:", values)
}
在此例中,主协程等待着直到其它5个协程已经将各自负责的元素初始化完毕此会打印出各个元素值。 这里是一个可能的程序执行输出结果:
Done: 4
Done: 1
Done: 3
Done: 0
Done: 2
values: [71 89 50 62 60]
我们可以将上例中的Add方法调用拆分成多次调用:
...
	var wg sync.WaitGroup
	for i := 0; i < N; i++ {
		wg.Add(1) // 将被执行5次
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			wg.Done()
		}()
	}
...

一个*sync.WaitGroup值的Wait方法可以在多个协程中调用。 当对应的sync.WaitGroup值维护的计数降为0,这些协程都将得到一个(广播)通知而结束阻塞状态。
func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wgA, wgB sync.WaitGroup
	wgA.Add(N)
	wgB.Add(1)

	for i := 0; i < N; i++ {
		i := i
		go func() {
			wgB.Wait() // 等待广播通知
			log.Printf("values[%v]=%v \n", i, values[i])
			wgA.Done()
		}()
	}

	// 下面这个循环保证将在上面的任何一个
	// wg.Wait调用结束之前执行。
	for i := 0; i < N; i++ {
		values[i] = 50 + rand.Int31n(50)
	}
	wgB.Done() // 发出一个广播通知
	wgA.Wait()
}

一个WaitGroup可以在它的一个Wait方法返回之后被重用。 但是请注意,当一个WaitGroup值维护的基数为零时,它的带有正整数实参的Add方法调用不能和它的Wait方法调用并发运行,否则将可能出现数据竞争。

sync.Once类型

每个*sync.Once值有一个Do(f func())方法。 此方法只有一个类型为func()的参数。

对一个可寻址的sync.Onceoo.Do()(即(&o).Do()的简写形式)方法调用可以在多个协程中被多次并发地执行, 这些方法调用的实参应该(但并不强制)为同一个函数值。 在这些方法调用中,有且只有一个调用的实参函数(值)将得到调用。 此被调用的实参函数保证在任何o.Do()方法调用返回之前退出。 换句话说,被调用的实参函数内的代码将在任何o.Do()方法返回调用之前被执行。

一般来说,一个sync.Once值被用来确保一段代码在一个并发程序中被执行且仅被执行一次。

一个例子:
package main

import (
	"log"
	"sync"
)

func main() {
	log.SetFlags(0)

	x := 0
	doSomething := func() {
		x++
		log.Println("Hello")
	}

	var wg sync.WaitGroup
	var once sync.Once
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			once.Do(doSomething)
			log.Println("world!")
		}()
	}

	wg.Wait()
	log.Println("x =", x) // x = 1
}

在此例中,Hello将仅被输出一次,而world!将被输出5次,并且Hello肯定在所有的5个world!之前输出。

sync.Mutex(互斥锁)和sync.RWMutex(读写锁)类型

*sync.Mutex*sync.RWMutex类型都实现了sync.Locker接口类型。 所以这两个类型都有两个方法:Lock()Unlock(),用来保护一份数据不会被多个使用者同时读取和修改。

除了Lock()Unlock()这两个方法,*sync.RWMutex类型还有两个另外的方法:RLock()RUnlock(),用来支持多个读取者并发读取一份数据但防止此份数据被某个数据写入者和其它数据访问者(包括读取者和写入者)同时使用。

(注意:这里的数据读取者数据写入者不应该从字面上理解。有时候某些数据读取者可能修改数据,而有些数据写入者可能只读取数据。)

一个Mutex值常称为一个互斥锁。 一个Mutex零值为一个尚未加锁的互斥锁。 一个(可寻址的)Mutexm只有在未加锁状态时才能通过m.Lock()方法调用被成功加锁。 换句话说,一旦m值被加了锁(亦即某个m.Lock()方法调用成功返回), 一个新的加锁试图将导致当前协程进入阻塞状态,直到此Mutex值被解锁为止(通过m.Unlock()方法调用)。

注意:m.Lock()m.Unlock()分别是(&m).Lock()(&m).Unlock()的简写形式。

一个使用sync.Mutex的例子:
package main

import (
	"fmt"
	"runtime"
	"sync"
)

type Counter struct {
	m sync.Mutex
	n uint64
}

func (c *Counter) Value() uint64 {
	c.m.Lock()
	defer c.m.Unlock()
	return c.n
}

func (c *Counter) Increase(delta uint64) {
	c.m.Lock()
	c.n += delta
	c.m.Unlock()
}

func main() {
	var c Counter
	for i := 0; i < 100; i++ {
		go func() {
			for k := 0; k < 100; k++ {
				c.Increase(1)
			}
		}()
	}

	// 此循环仅为演示目的。
	for c.Value() < 10000 {
		runtime.Gosched()
	}
	fmt.Println(c.Value()) // 10000
}

在上面这个例子中,一个Counter值使用了一个Mutex字段来确保它的字段n永远不会被多个协程同时使用。

一个RWMutex值常称为一个读写互斥锁。 对于一个可寻址的RWMutexrwm,数据写入者可以通过方法调用rwm.Lock()获取rwm的写锁,或者通过m.RLock()方法调用获取rwm的读锁。 方法调用rwm.Unlock()rwm.RUnlock()用来释放rwm的写锁和读锁。

注意rwm.Lock()rwm.Unlock()rwm.RLock()rwm.RUnlock()分别是(&rwm).Lock()(&rwm).Unlock()(&rwm).RLock()(&rwm).RUnlock()的简写形式。

对于一个可寻址的RWMutexrwm,下列规则存在:

后两条规则是为了确保数据读取者和写入者都有机会执行它们的操作。

请注意:一个锁并不会绑定到一个协程上;换句话说,一个锁的获取者和此锁的持有者(以及释放者)可能不是一个协程,尽管在实践中这种情况比较少见。

上一个例子中的Counter类型的m字段的类型可以更改为sync.RWMutex,从而使得执行效率更高,如下面的代码所示。
...
type Counter struct {
	//m sync.Mutex
	m sync.RWMutex
	n uint64
}

func (c *Counter) Value() uint64 {
	//c.m.Lock()
	//defer c.m.Unlock()
	c.m.RLock()
	defer c.m.RUnlock()
	return c.n
}
...

根据上面列出的后两条规则,下面这个程序最有可能输出abdc
package main

import (
	"fmt"
	"time"
	"sync"
)

func main() {
	var m sync.RWMutex
	go func() {
		m.RLock()
		fmt.Print("a")
		time.Sleep(time.Second)
		m.RUnlock()
	}()
	go func() {
		time.Sleep(time.Second * 1 / 4)
		m.Lock()
		fmt.Print("b")
		time.Sleep(time.Second)
		m.Unlock()
	}()
	go func() {
		time.Sleep(time.Second * 2 / 4)
		m.Lock()
		fmt.Print("c")
		m.Unlock()
	}()
	go func () {
		time.Sleep(time.Second * 3 / 4)
		m.RLock()
		fmt.Print("d")
		m.RUnlock()
	}()
	time.Sleep(time.Second * 3)
	fmt.Println()
}

请注意,上例这个程序仅仅是为了解释和验证上面列出的读写锁的后两条加锁规则。 此程序使用了time.Sleep调用来做协程间的同步。这种所谓的同步方法不应该被使用在生产代码中

sync.Mutexsync.RWMutex值也可以用来实现通知,尽管这不是Go中最优雅的方法来实现通知。 下面是一个使用了Mutex值来实现通知的例子。
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var m sync.Mutex
	m.Lock()
	go func() {
		time.Sleep(time.Second)
		fmt.Println("Hi")
		m.Unlock() // 发出一个通知
	}()
	m.Lock() // 等待通知
	fmt.Println("Bye")
}

在此例中,Hi将确保在Bye之前打印出来。 关于sync.Mutexsync.RWMutex值相关的内存顺序保证,请阅读Go中的内存顺序保证一文。

sync.Cond类型

sync.Cond类型提供了一种有效的方式来实现多个协程间的通知。

每个sync.Cond值拥有一个sync.Locker类型的名为L的字段。 此字段的具体值常常为一个*sync.Mutex值或者*sync.RWMutex值。

*sync.Cond类型有三个方法Wait()Signal()Broadcast()

每个Cond值维护着一个先进先出等待协程队列。 对于一个可寻址的Condc

请注意:c.Wait()c.Signal()c.Broadcast()分别为(&c).Wait()(&c).Signal()(&c).Broadcast()的简写形式。

c.Signal()c.Broadcast()调用常用来通知某个条件的状态发生了变化。 一般说来,c.Wait()应该在一个检查某个条件是否已经得到满足的循环中调用。

下面是一个典型的sync.Cond用例。
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 10
	var values [N]string

	cond := sync.NewCond(&sync.Mutex{})
	cond.L.Lock()

	for i := 0; i < N; i++ {
		d := time.Second * time.Duration(rand.Intn(10)) / 10
		go func(i int) {
			time.Sleep(d) // 模拟一个工作负载
			cond.L.Lock()
			// 下面的修改必须在cond.L被锁定的时候执行
			values[i] = string('a' + i)
			cond.Broadcast() // 可以在cond.L被解锁后发出通知
			cond.L.Unlock()
			// 上面的通知也可以在cond.L未锁定的时候发出。
			//cond.Broadcast() // 上面的调用也可以放在这里
		}(i)
	}

	// 此函数必须在cond.L被锁定的时候调用。
	checkCondition := func() bool {
		fmt.Println(values)
		for i := 0; i < N; i++ {
			if values[i] == "" {
				return false
			}
		}
		return true
	}
	for !checkCondition() {
		cond.Wait() // 必须在cond.L被锁定的时候调用
	}
	cond.L.Unlock()
}
一个可能的输出:
[         ]
[     f    ]
[  c   f    ]
[  c   f  h  ]
[ b c   f  h  ]
[a b c   f  h  j]
[a b c   f g h i j]
[a b c  e f g h i j]
[a b c d e f g h i j]

因为上例中只有一个协程(主协程)在等待通知,所以其中的cond.Broadcast()调用也可以换为cond.Signal()。 如上例中的注释所示,cond.Broadcast()cond.Signal()不必在cond.L的锁被成功获取的时候调用。

为了防止数据竞争,对自定义条件的修改必须在cond.L的锁被成功获取的时候才能执行。 另外,checkCondition函数和cond.Wait方法也必须在cond.L的锁被成功获取的时候才可被调用。

事实上,对于上面这个特定的例子,cond.L字段的也可以为一个*sync.RWMutex值。 对自定义条件的十个部分的修改可以在RWMutex值的读锁被成功获取的时候执行。这十个修改可以并发进行,因为它们是互不干扰的。 如下面的代码所示:

...
	cond := sync.NewCond(&sync.RWMutex{})
	cond.L.Lock()

	for i := 0; i < N; i++ {
		d := time.Second * time.Duration(rand.Intn(10)) / 10
		go func(i int) {
			time.Sleep(d)
			cond.L.(*sync.RWMutex).Lock()
			values[i] = string('a' + i)
			cond.L.(*sync.RWMutex).Unlock()
			cond.Signal()
		}(i)
	}
...

在上面的代码中,此sync.RWMutex值的用法有些不符常规。 它的读锁被一些修改数组元素的协程所获取并持有,而它的写锁被主协程获取持有用来读取并检查各个数组元素的值。

Cond值所表示的自定义条件可以是一个虚无。对于这种情况,此Cond值纯粹被用来实现通知。 比如,下面这个程序将打印出abc或者bac
package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1)
	cond := sync.NewCond(&sync.Mutex{})
	cond.L.Lock()
	go func() {
		cond.L.Lock()
		go func() {
			cond.L.Lock()
			cond.Broadcast()
			cond.L.Unlock()
		}()
		cond.Wait()
		fmt.Print("a")
		cond.L.Unlock()
		wg.Done()
	}()
	cond.Wait()
	fmt.Print("b")
	cond.L.Unlock()
	wg.Wait()
	fmt.Println("c")
}

如果需要,多个sync.Cond值可以共享一个sync.Locker值。但是这种情形在实践中并不多见。

Go语言101项目目前同时托管在GithubGitlab上。 欢迎各位在这两个项目中通过提交bug和PR的方式来改进完善Go语言101中的各篇文章。

本书微信公众号名称为"Go 101"。每个工作日此公众号将尽量发表一篇和Go语言相关的原创短文。各位如果感兴趣,可以搜索关注一下。

赞赏