当主函数在没有从输出channel中接收完所有值便退出时,它必须告诉上游停止数据发送。可以通过向done channel发送停止信号实现。此处有两个可能阻塞的goroutine,所以需发两个值。
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutine that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
// Tell the remaining senders we're leaving
done <- struct{}{}
done <- struct{}{}
merge中的发送goroutine用select语句取代了原来的发送操作,它将负责将数据发出和接收done channel的消息。Done将接收的值是空结构体,因为该值没有任何意义:它仅仅是用来表明应该停止向输出channel发送数据了。该goroutine将会不停循环地从输入channel中接收数据,以确保上游不被阻塞。(待会我们将会讨论怎么提早从循环退出)。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
// ... the rest is unchanged ...
这意味着main函数中可仅仅通过关闭done channel来使发送方解除阻塞。该关闭操作会产生一个有效的广播信号并传递给发送方。我们可以扩展pipeline中的函数,使其可以多接受一个done参数,然后通过defer语句对执行关闭以便于在main退出时发送给各阶段完成信号来实现退出。
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
一旦done channel关闭,各个阶段就可以成功返回退出。当done被关闭,merge就会知道上游会停止发送数据,merge函数就会停止从输入channel接收数据并返回。输出channel通过defer语句确保所有的wg.Done在函数时能被调用。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
// ... the rest is unchanged ...
相似地,只要done channel一关闭,sq函数也会立刻返回。通过defer语句,sql函数确保它们输出channel一定能被顺利关闭。
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return out
- 各个阶段在所有的发送操作完成便会关闭输出channels;
- 各个阶段会不停的接收数据,直到这些channel都被关闭或者发送方不再阻塞;
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
var paths []string
for path := range m {
paths = append(paths, path)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
if !info.Mode().IsRegular() {
return nil
data, err := ioutil.ReadFile(path)
if err != nil {
return err
m[path] = md5.Sum(data)
return nil
if err != nil {
return nil, err
return m, nil
type result struct {
path string
sum [md5.Size]byte
err error
sumFiles函数返回了两个channels:一个用于传输结果,另一个用于返回filepath.Walk的错误。walk函数为每个文件启动了一个新的goroutine来处理它们,同时也检查是否done 。如果done被关闭,walk函数将立刻返回。
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
if !info.Mode().IsRegular() {
return nil
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
return nil
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
// No select needed here, since errc is buffered.
errc <- err
return c, errc
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
m[r.path] = r.sum
if err := <-errc; err != nil {
return nil, err
return m, nil
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
if !info.Mode().IsRegular() {
return nil
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
return nil
return paths, errc
第二阶段,我们为digester函数启动了固定数量的goroutine,它将从paths中接收文件名处理并发送摘要结果给channel c:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
go func() {
最后阶段,我们从channel c中接收所有的结果并检查errc是否返回了错误。该检查无法过早执行,因为过早检查,可能会导致walkFile阻塞。
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
m[r.path] = r.sum
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
return m, nil
这篇文章给我们展示了如何在GO中构建流式数据pipeline。pipeline中处理失败是需要一定的技巧的,因为每个尝试给下游发送数据的阶段都可能被阻塞,因为下游可能不在接收上游的输入数据。我们展示了如何通过关闭channel来给所有的goroutine发送 "done" 信号和定义了正确构建pipeline的指导原则。
作者:Sameer Ajmani