如何优雅地关闭数据通道

在本文发表数日前,我曾写了一篇文章来解释数据通道的规则。 那篇文章在redditHN上获得了很多点赞,但也有很多人对Go数据通道的细节设计提出了一些批评意见。

这些批评主要针对于数据通道设计中的下列细节:
  1. 没有一个简单和通用的方法用来在不改变一个数据通道的状态的情况下检查这个通道是否已经关闭。
  2. 关闭一个已经关闭的数据通道将产生一个恐慌,所以在不知道一个数据通道是否已经关闭的时候关闭此数据通道是很危险的。
  3. 向一个已关闭的数据通道发送数据将产生一个恐慌,所以在不知道一个数据通道是否已经关闭的时候向此数据通道发送数据是很危险的。

这些批评看上去有几分道理(实际上属于对数据通道的不正确使用导致的偏见)。 是的,Go语言中并没有提供一个内置函数来检查一个数据通道是否已经关闭。

在Go中,如果,我们能够保证从不会向一个数据通道发送数据,那么有一个简单的方法来判断此数据通道是否已经关闭。 此方法已经在上一篇文章数据通道用例大全中展示过了。 这里为了本文的连贯性,在下面的例子中重新列出了此方法。
package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
	select {
	case <-ch:
		return true
	default:
	}

	return false
}

func main() {
	c := make(chan T)
	fmt.Println(IsClosed(c)) // false
	close(c)
	fmt.Println(IsClosed(c)) // true
}

如前所述,此方法并不是一个通用的检查数据通道是否已经关闭的方法。

事实上,即使有一个内置closed函数用来检查一个数据通道是否已经关闭,它的有用性也是十分有限的。 原因是当此函数的一个调用的结果返回时,被查询的数据通道的状态可能已经又改变了,导致此调用结果并不能反映出被查询的数据通道的最新状态。 虽然我们可以根据一个调用closed(ch)的返回结果为true而得出我们不应该再向通道ch发送数据的结论, 但是我们不能根据一个调用closed(ch)的返回结果为false而得出我们可以继续向通道ch发送数据的结论。

数据通道关闭原则

一个常用的使用Go数据通道的原则是不要在数据接收方或者在有多个发送者的情况下关闭数据通道。 换句话说,我们只应该让一个数据通道唯一的发送者关闭一个此数据通道。

下面我们将称此原则为数据通道关闭原则

当然,这并不是一个通用的关闭数据通道的原则。通用的原则是不要关闭已关闭的数据通道。 如果我们能够保证从某个时刻之后,再没有协程将向一个未关闭的非nil数据通道发送数据,则一个协程可以安全地关闭此通道。 然而,做出这样的保证常常需要很大的努力,从而导致代码过度复杂。 另一方面,遵循数据通道关闭原则是一件相对简单的事儿。

粗鲁地关闭数据通道的方法

如果由于某种原因,你一定非要从数据接收方或者让众多发送者中的一个关闭一个数据通道,你可以使用恢复机制来防止可能产生的恐慌而导致程序崩溃。 下面就是这样的一个实现(假设数据通道的元素类型为T)。
func SafeClose(ch chan T) (justClosed bool) {
	defer func() {
		if recover() != nil {
			// 一个函数的返回结果可以在defer调用中修改。
			justClosed = false
		}
	}()

	// 假设ch != nil。
	close(ch)   // 如果ch已关闭,则产生一个恐慌。
	return true // <=> justClosed = true; return
}

此方法违反了数据通道关闭原则

同样的方法可以用来粗鲁地向一个关闭状态未知的数据通道发送数据。
func SafeSend(ch chan T, value T) (closed bool) {
	defer func() {
		if recover() != nil {
			closed = true
		}
	}()

	ch <- value  // 如果ch已关闭,则产生一个恐慌。
	return false // <=> closed = false; return
}

这样的粗鲁方法不仅违反了数据通道关闭原则,而且Go白皮书和标准编译器不保证它的实现中不存在数据竞争

礼貌地关闭数据通道的方法

很多Go程序员喜欢使用sync.Once来关闭数据通道。
type MyChannel struct {
	C    chan T
	once sync.Once
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.once.Do(func() {
		close(mc.C)
	})
}

当然,我们也可以使用sync.Mutex来防止多次关闭一个数据通道。
type MyChannel struct {
	C      chan T
	closed bool
	mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
	return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	if !mc.closed {
		close(mc.C)
		mc.closed = true
	}
}

func (mc *MyChannel) IsClosed() bool {
	mc.mutex.Lock()
	defer mc.mutex.Unlock()
	return mc.closed
}

我们应该理解为什么Go没有提供这样的内置的SafeClose函数的原因是从接收方一端或者让众多发送者中的一个关闭一个数据通道常常属于不良的并发程序设计。 Go甚至从语法上禁止关闭单向接收通道。

优雅地关闭数据通道的方法

上一节中介绍的SafeSend函数有一个弊端,它的调用不能做为case操作而被使用在select代码块中。 另外,很多Go程序员(包括我)认为上面两节展示的关闭数据通道的方法不是很优雅。 本节下面将介绍一些在各种情形下使用纯数据通道操作来关闭数据通道的方法。

(为了演示程序的完整性,下面这些例子中使用到了sync.WaitGroup。在实践中,sync.WaitGroup并不是必需的。)

情形一:M个接收者和一个发送者。发送者通过关闭数据通道来传递发送结束信号

这是最简单的一种情形。当发送者欲结束发送,让它关闭通讯用的数据通道即可。
package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 100

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int, 100)

	// 发送者
	go func() {
		for {
			if value := rand.Intn(Max); value == 0 {
				// 此唯一的发送者可以安全地关闭此数据通道。
				close(dataCh)
				return
			} else {
				dataCh <- value
			}
		}
	}()

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			// 接收数据直到通道dataCh已关闭
			// 并且dataCh的缓冲队列已空。
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()
}

情形二:一个接收者和N个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要在发送数据了

此情形比上一种情形复杂一些。我们不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了数据通道关闭原则。 但是我们可以让接收者关闭一个额外的信号数据通道来通知发送者不要在发送数据了。
package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)

	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh是一个额外的信号通道。它的
		// 发送者为dataCh数据通道的接收者。
		// 它的接收者为dataCh数据通道的发送者。

	// 发送者
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// 这里的第一个尝试接收用来让此发送者
				// 协程尽早地退出。对于这个特定的例子,
				// 此select代码块并非必需。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已经关闭,此第二个select
				// 代码块中的第一个分支仍很有可能在若干个
				// 循环步内依然不会被选中。如果这是不可接受
				// 的,则上面的第一个select代码块是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// 接收者
	go func() {
		defer wgReceivers.Done()

		for value := range dataCh {
			if value == Max-1 {
				// 此唯一的接收者同时也是stopCh数据通道的
				// 唯一发送者。尽管它不能安全地关闭dataCh数
				// 据通道,但它可以安全地关闭stopCh数据通道。
				close(stopCh)
				return
			}

			log.Println(value)
		}
	}()

	// ...
	wgReceivers.Wait()
}

如此例中的注释所述,对于此额外的信号通道stopCh,它只有一个发送者,即dataCh数据通道的唯一接收者。 dataCh数据通道的接收者关闭了信号通道stopCh,这是不违反数据通道关闭原则的。

在此例中,数据通道dataCh并没有被关闭。是的,我们不必关闭它。 当一个数据通道不再被任何协程所使用后,它将逐渐被垃圾回收掉,无论它是否已经被关闭。 所以这里的优雅性体现在通过不关闭一个数据通道来停止使用此数据通道。

情形三:M个接收者和N个发送者。它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号

这是最复杂的一种情形。我们不能让接收者和发送者中的任何一个关闭数据通道,我们也不能让多个接收者之一关闭一个额外的信号通道。 这两种做法都违反了数据通道关闭原则。 然而,我们可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有的接收者和发送者结束工作。 具体实现见下例。注意其中使用了一个尝试发送操作来向中间调解者发送信号。
package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh是一个额外的信号通道。它的发送
		// 者为中间调解者。它的接收者为dataCh
		// 数据通道的所有的发送者和接收者。
	toStop := make(chan string, 1)
		// toStop是一个用来通知中间调解者让其
		// 关闭信号通道stopCh的第二个信号通道。
		// 此第二个信号通道的发送者为dataCh数据
		// 通道的所有的发送者和接收者,它的接收者
		// 为中间调解者。它必须为一个缓冲通道。

	var stoppedBy string

	// 中间调解者
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// 发送者
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					// 为了防止阻塞,这里使用了一个尝试
					// 发送操作来向中间调解者发送信号。
					select {
					case toStop <- "发送者#" + id:
					default:
					}
					return
				}

				// 此处的尝试接收操作是为了让此发送协程尽早
				// 退出。标准编译器对尝试接收和尝试发送做了
				// 特殊的优化,因而它们的速度很快。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已关闭,如果这个select代码块
				// 中第二个分支的发送操作是非阻塞的,则第一个
				// 分支仍很有可能在若干个循环步内依然不会被选
				// 中。如果这是不可接受的,则上面的第一个尝试
				// 接收操作代码块是必需的。
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收者
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				// 和发送者协程一样,此处的尝试接收操作是为了
				// 让此接收协程尽早退出。
				select {
				case <- stopCh:
					return
				default:
				}

				// 即使stopCh已关闭,如果这个select代码块
				// 中第二个分支的接收操作是非阻塞的,则第一个
				// 分支仍很有可能在若干个循环步内依然不会被选
				// 中。如果这是不可接受的,则上面尝试接收操作
				// 代码块是必需的。
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						// 为了防止阻塞,这里使用了一个尝试
						// 发送操作来向中间调解者发送信号。
						select {
						case toStop <- "接收者#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	// ...
	wgReceivers.Wait()
	log.Println("被" + stoppedBy + "终止了")
}

在此例中,数据通道关闭原则依旧得到了遵守。

请注意,信号通道toStop的容量必须至少为1。 如果它的容量为0,则在中间调解者还未准备好的情况下就已经有某个协程向toStop发送信号时,此信号将被抛弃。

我们也可以不使用尝试发送操作向中间调解者发送信号,但信号通道toStop的容量必须至少为数据发送者和数据接收者的数量之和,以防止向其发送数据时(有一个极其微小的可能)导致某些发送者和接收者协程永久阻塞。
...
toStop := make(chan string, NumReceivers + NumSenders)
...
			value := rand.Intn(Max)
			if value == 0 {
				toStop <- "sender#" + id
				return
			}
...
				if value == Max-1 {
					toStop <- "receiver#" + id
					return
				}
...

更多情形?

实践中存在以上三种情形的更多变种。 比如,上述第三种情形的一个变种可能要求接收者将dataCh中留存的所有数据读尽。 本文就不对类似更多变种多加讲解了。

尽管上述三种情形没有覆盖所有的使用情形,但它们是最基本的几种情形。其它很多情形都可以归为这三种情形。

结论

并没有什么情况非得逼得我们违反数据通道关闭原则。 如果你遇到了此情形,请考虑修改你的代码流程和结构设计。

使用数据通道编程宛如在艺术创作一般!

Go语言101项目目前同时托管在GithubGitlab上。 欢迎各位在这两个项目中通过提交bug和PR的方式来改进完善Go语言101中的各篇文章。

本书微信公众号名称为"Go 101"。每个工作日此公众号将尽量发表一篇和Go语言相关的原创短文。各位如果感兴趣,可以搜索关注一下。

赞赏