我爱编程

go 圣经 摘抄 第 8 章

2018-06-15  本文已影响0人  SongLiang

Chapter 8 Goroutines and Channels

Go enable two styles of concurrent programming. This chapter presents coroutines and channels, which support communicating sequential processes or CSP, a model of concurrency in which values are passed between independent activities (goroutines) but variables are for the the most part confined to a single activity. Chapter 9 covers some aspects of the more traditional model of shared memory multithreading, which will be familiar if you've used threads in other mainstream languages. Chapter 9 also points out some important hazards and pitfalls of concurrent programming that we won't delve into in this chapter.
goroutine 两种模式,一种用于两个 goroutine 之间的交流,variables 被限定在一个单独的 activities。
另一种类似于其他主流语言的多线程,特点是 shared memory multithreading。

8.1 Goroutines

In Go, each concurrently executing activity is called a goroutine.
golang中,每个并发执行的 activity 被称为 goroutine。

If you have used operating system threads or threads in other languages, then you can assume for now that a goroutine is similar to a thread, and you'll be able to write correct programs. The differences between threads and goroutines are essentially quantitative, not qualitative, and will be described in Section 9.8.
threads 与 goroutines 的区别是定量的,而非定性的。在9.8中讲会进行进一步的解释。

When a program starts, its only goroutine is the one that calls the main function, so we call it the main goroutine. New goroutines are created by the go statement. Syntactically, a go statement is an ordinary function or method call prefixed by the keyword go. A go statement causes the function to be called in a newly created goroutine. The go statement itself completes immediately:
当一个程序启动后,程序唯一的main function 就是 main goroutine。

package main

import (
    "time"
    "fmt"
)

func main() {
    go spinner(5 * time.Millisecond)
    const n = 45
    fibN := fib(n)
    fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}

func spinner(delay time.Duration) {
    for {
        for _, r := range `-\|/` {
            fmt.Printf("\r%c", r)
            time.Sleep(delay)
        }
    }
}

func fib(x int) int {
    if x < 2 {
        return x
    }
    return fib(x - 1) + fib(x - 2)
}

程序里的 /r 表示回车

Other than by returning from main or exiting the program, there is no programmatic way for one goroutine to stop another, but as we will see later, there are ways to communicate with a goroutine to request that it stop itself.
除了从 main 函数返回 或者 退出当前的程序,没有程序上的办法能让一个 goroutine 来停止另一个 goroutine,但是有办法让一个 goroutine 向另一个 goroutine 发送消息,让他自己停止下来。

8.2 Example: Concurrent Clock Server

一个时钟例子
服务器

package main

import (
    "net"
    "log"
    "io"
    "time"
)

func main() {
    listener, err := net.Listen("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err)
            continue
        }
        handleConn(conn)
    }
}

func handleConn(c net.Conn) {
    defer c.Close()
    for {
        _, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
        if err != nil {
            return
        }
        time.Sleep(1 * time.Second)
    }
}

重要的是三步:

package main

import (
    "net"
    "log"
    "io"
    "os"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    _, err = io.Copy(os.Stdout, conn)
    if err != nil {
        log.Fatal(err)
    }
}

程序的关键代码片段

注释:
killall clock1

8.3. Example: Concurrent Echo Server

The clock server used one goroutine per connection. In this section, we'll build an echo server that uses multiple goroutines per connection.
之前 clock 的例子里,每次连接用一个 goroutine。下面的 echo server 每次连接用很多 goroutine。

reverb1

package main

import (
    "net"
    "log"
    "time"
    "fmt"
    "strings"
    "bufio"
)

func echo(c net.Conn, shout string, delay time.Duration) {
    fmt.Fprintln(c, strings.ToUpper(shout))
    time.Sleep(delay)
    fmt.Fprintln(c, shout)
    time.Sleep(delay)
    fmt.Fprintln(c, strings.ToLower(shout))
}

func handleConn(c net.Conn) {
    input := bufio.NewScanner(c)
    for input.Scan() {
        echo(c, input.Text(), 1 * time.Second)
    }
    c.Close()
}

func main() {
    l, err := net.Listen("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    for {
        conn, err := l.Accept()
        if err != nil {
            log.Print(err)
            continue
        }
        handleConn(conn)
    }
}

netcat2

package main

import (
    "net"
    "log"
    "os"
    "io"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    go mustCopy(os.Stdout, conn)
    mustCopy(conn, os.Stdin)
}

func mustCopy(dst io.Writer, src io.Reader) {
    if _, err := io.Copy(dst, src); err != nil {
        log.Fatal(err)
    }
}

reverb2

package main

import (
    "net"
    "log"
    "bufio"
    "time"
    "fmt"
    "strings"
)

func echo(c net.Conn, shout string, delay time.Duration) {
    fmt.Fprintln(c, strings.ToUpper(shout))
    time.Sleep(delay)
    fmt.Fprintln(c, shout)
    time.Sleep(delay)
    fmt.Fprintln(c, strings.ToLower(shout))
}

func handleConn(c net.Conn) {
    input := bufio.NewScanner(c)
    for input.Scan() {
        go echo(c, input.Text(), 1 * time.Second)
    }
    c.Close()
}

func main() {
    l, err := net.Listen("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    for {
        conn, err := l.Accept()
        if err != nil {
            log.Print(err)
            continue
        }
        go handleConn(conn)
    }
}

8.4 Channels

If goroutines are the activities of a concurrent Go program, channels are the connections between them. A channel is a communication mechanism that lets one goroutine send values to another goroutine. Each channel is a conduit for values of a particular type, called the channel's element type.
channel 是一种通讯机制,允许一个 goroutine 向另一个 goroutine 发送数据。

ch := make(chan int)

As with maps, a channel is a reference to the data structure created by make. When we copy a channel or pass one as an argument to a function, we are copying a reference, so caller and callee refer to the same data structure. As with other reference types, the zero value of channel is nil.

Two channels of the same type may be compared using ==. The comparison is true if both are references to the same channel data structure. A channel may also be compared to nil.

A channel has two principal operations, send and receive, collectively known as communications. A send statement transmits a value from one goroutine, through the channel, to another goroutine executing a corresponding receive expression. Both operations are written using the <- operator. In a send statement, the <- separates the channel and value operands. In a receive expression, <- precedes the channel operand. A receive expression whose result is not used is a valid statement.

ch <- x           // a send statement 
x = <-ch         // a receive expression in an assignment statement 
<-ch               // a receive statement; result is discarded 
ch = make(chan int)         // unbuffered channel 
ch = make(chan int, 0)    // unbuffered channel 
ch = make(chan int, 3)    // bugfered channel with capacity 3 

8.4.1. Unbuffered Channels

A send operation on an unbuffered channel blocks the sending goroutine executes a corresponding receive on the same channel, at which point the value is transmitted and bothe goroutines may continue. Conversely, if the receive operation was attempted first, the receiving goroutine is blocked until another goroutine performs a send on the same channel.

d
Communication over an unbuffered channel causes the sending and receiving goroutines to synchronize. Because of this, unbuffered channels are sometimes called synchronous channels. When a value is sent on an unbuffered channel, the receipt of the value happens before the reawakening of the sending goroutine.

In discussions of concurrency, when we say x happens before y, we don't mean merely that x occurs earlier in time than y, we mean that it is guaranteed to do so and that all its prior effects, such as updates to variables, are complete and that you may rely on them.

When x neither happens before y nor after y, we say that x is concurrent with y. This doesn't mean that x and y are necessarily simultaneous, merely that we cannot assume anything about their ordering. As we'll see in the next chapter, it's necessary to order certain events during the program's execution to avoid the problems that arise when two goroutines access the same variable concurrently.

串联的Channels(Pipelines)

pipeline1

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    // Counter
    go func() {
        for x := 0; ; x++ {
            naturals <- x
        }
    }()

    // Squarer
    go func() {
        for {
            x := <-naturals
            squares <- x * x
        }
    }()

    // Printer (in main goroutine)
    for {
        fmt.Println(<-squares)
    }
}
package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    go func() {
        for x := 0; x < 10 ; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    go func() {
        for {
            x := <-naturals
            squares <- x * x
            time.Sleep(time.Second)
        }
    }()

    for {
        fmt.Println(<-squares)
    }
    //close(naturals)
}

After a channel has been closed, any further send operations on it will panic. After the closed channel has been drained, that is, after the last sent element has been received, all subsequent receive operations will proceed without blocking but will yield a zero value. Closing the naturals channel above would cause the squarer's loop to spin as it receives a never-ending stream of zero values, and to send these zeros to the printer.

那么如何保证,当发送的 natural channel 被关闭后,停止 pipeline 的运行呢?
There is no way to test directly whether a channel has been closed, but there is a variant of the receive operation that produces two results: the received channel element, plus a boolean value, conventionally called ok, which is true for a successful receive and false for a receive on a closed and drained channel. Using this feature, we can modify the squarer's loop to stop when the naturals channel is drained and close the squares channel in turn.

通过这个机制,当 naturals 发送完数据后,接收端就能停止下来。

两种等价的写法

// 通过对 channel 的读取(两个值),后面一个值表示要读取的 channel 是否被关闭
    go func() {
        for {
            x, ok := <-naturals 
            if !ok {
                break
            }
            squares <- x * x 
        }
        close(squares)
    }()
//  上一种写法太笨拙了,所以采用下一种写法,这两者是等价的
    go func() {
        for x := range naturals {
            squares <- x * x
            time.Sleep(time.Second)
        }
        close(squares)
    }()

(书本的解释)
Because the syntax above is clumsy and this pattern is common, the language lets us use a range loop to iterate over channels too. This is a more convenient syntax for receiving all the values sent on a channel and terminating the loop after the last one.

package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    go func() {
        for x := 0; x < 3; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    go func() {
        //for x := range naturals {
        //  squares <- x * x
        //}
        for {
            x := <-naturals
            squares <- x * x
            time.Sleep(time.Second)
        }
    }()

    for x := range squares {
        fmt.Println(x)
    }
}
package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    go func() {
        for x := 0; x < 3; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    go func() {
        for x := range naturals {
            squares <- x * x
            time.Sleep(time.Second)
        }
        //for {
        //  x := <-naturals
        //  squares <- x * x
        //  time.Sleep(time.Second)
        //}
    }()

    for x := range squares {
        fmt.Println(x)
    }
}

pipeline2

package main

import (
    "time"
    "fmt"
)

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    go func() {
        for x := 0; x < 3; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    go func() {
        for x := range naturals {
            squares <- x * x
            time.Sleep(time.Second)
        }
        close(squares)
    }()

    for x := range squares {
        fmt.Println(x)
    }
}

Attempting to close an already-closed channel causes a panic, as does closing a nil channel. Closing channels has another use as a broadcast mechanism, which we'll cover in Section 8.9.

8.4.3. 单方向的Channel (Unidirectional Channel Types)

To document this intent and prevent misuse, the Go type system provides unidirectional channel types that expose only one or the other of the send and receive operations. The type chan<- int, a send-only channel of int, allows sends but not receives. Conversely, the type <-chan int, a receive-only channel of int, allows receives but not sends. (The position of the <- arrow relative to the chan keyword is a mnemonic.) violations of this discipline are detected at compile time.

pipeline3
package main

import (
    "fmt"
)

func counter(out chan<- int) {
    for i := 0; i < 10; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    go counter(naturals)
    go squarer(squares, naturals)

    printer(squares)
}
8.4.4. 带缓存的Channels (Buffered Channels)

A buffered channel has a queue of elements. The queue's maximum size is determined when it is created, by the capacity argument to make.

ch = make(chan string, 3)

d
A send operation on a buffered channel inserts an element at the back of the queue, and a receive operation removes an element from the front. If the channel is full, the send operation blocks its goroutine until space is make available by another coroutine's receive. Conversely, if the channel is empty, a receive operation blocks until a value is sent by another goroutine.

因此mirroredQuery函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。就是所谓的 finIn finOut)

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

func request(hostname string) (response string) { /* ... */ }
8.5 并发的循环 (Looping in Parallel)

In this section, we'll explore some common concurrency patterns for executing all the iterations of a loop in parallel.
我们会探索一些用来在并行时循环迭代的常见并发模型。

package thumbnail

// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)

embarrassingly parallel

Obviously the order in which we process the files doesn't matter, since each scaling operation is independent of all the others. Problems like this that consist entirely of subproblems that are completely independent of each other are described as embarrassingly parallel. Embarrassingly parallel problems are the easiest kind to implement concurrently and enjoy performance that scales linearly with the amount of parallelism.

Let's execute all these operations in parallel, thereby hiding the latency of the file I/O and using multiple CPUs for the image-scaling computations. Our first attempt at a concurrent version just adds a go keyword. We'll ignore errors for now and address them later.

// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
    for _, f := range filenames {
        go thumbnail.ImageFile(f) // NOTE: ignoring errors
    }
}
package main

import (
    "fmt"
    "time"
)

func main() {
    s := []int{6, 3, 9, 7, 10, 2, 5, 11, 35}
    fmt.Println(s)
    for _, e := range s {
        go func() {
            fmt.Println(e)
        }()
    }
    time.Sleep(time.Second * 5)
}
输出
5
5
5
5
11
35
35
35
35

package main

import (
    "fmt"
    "time"
)

func main() {
    s := []int{6, 3, 9, 7, 10, 2, 5, 11, 35}
    fmt.Println(s)
    for _, e := range s {
        go func(e int) {
            fmt.Printf("%d ", e)
        }(e)
    }
    time.Sleep(time.Second * 5)
}

输出
[6 3 9 7 10 2 5 11 35]
6 3 5 11 2 9 35 7 10 
func makeThumbnails4(filenames []string) error {
    errors := make(chan error)

    for _, f := range filenames {
        go func(f string) {
            _, err := thumbnail.ImageFile(f)
            errors <- err
        }(f)
    }

    for range filenames {
        if err := <-errors; err != nil {
            return err // NOTE: incorrect: goroutine leak!
        }
    }

    return nil
}
  1. 采用缓冲 channel
  2. 一个可选的解决办法是创建一个另外的goroutine,当main goroutine返回第一个错误的同时去排空channel+
// 使用 buffered channel 来解决问题
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
    type item struct {
        thumbfile string
        err       error
    }

    ch := make(chan item, len(filenames))
    for _, f := range filenames {
        go func(f string) {
            var it item
            it.thumbfile, it.err = thumbnail.ImageFile(f)
            ch <- it
        }(f)
    }

    for range filenames {
        it := <-ch
        if it.err != nil {
            return nil, it.err
        }
        thumbfiles = append(thumbfiles, it.thumbfile)
    }

    return thumbfiles, nil
}
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup // number of working goroutines
    for f := range filenames {
        wg.Add(1)
        // worker
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb) // OK to ignore error
            sizes <- info.Size()
        }(f)
    }

    // closer
    go func() {
        wg.Wait()
        close(sizes)
    }()

    var total int64
    for size := range sizes {
        total += size
    }
    return total
}
8.6.示例: 并发的Web爬虫
func crawl(url string) []string {
    fmt.Println(url)
    list, err := links.Extract(url)
    if err != nil {
        log.Print(err)
    }
    return list
}

//!-crawl

//!+main
func main() {
    worklist := make(chan []string)

    // Start with the command-line arguments.
    go func() { worklist <- os.Args[1:] }()

    // Crawl the web concurrently.
    seen := make(map[string]bool)
    for list := range worklist {
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                go func(link string) {
                    worklist <- crawl(link)
                }(link)
            }
        }
    }
}

The main function resembles breadthFirst. As before, a work list records the queue of times that need processing, each item being a list of URLs to crawl, but this time, instead of representing the queue using a slice, we use a channel. Each call to crawl occurs in its own goroutine and sends the links

The program is too parallel. Unbounded parallelism is rarely a good idea since there is always a limiting factor in the system, such as the number of CPU cores for compute-bound workloads, the number of spindles and heads for local disk I/O operations, the bandwidth of the network for streaming downloads, or the serving capacity of a web service. ... A simple way to do that in our example is to ensure that no more than n calls to links.Extract are active at once, where n is comfortably less than the file descriptor limit ---- 20, say. This is analogous to the way a doorman at a crowded nightclub admits a guest only when some other guest leaves.

We can limit parallelism using a buffered channel of capacity n to model a concurrency primitive called a counting semaphore. Conceptually, each of the n vacant slots in the channel buffer represents a token entitling the holder to proceed. Sending a value into the channel acquires a token, and receiving a value from the channel releases a token, creating a new vacant slot. This ensures that at most n sends can occur without an intervening receive. ( Although it might be more intuitive to treat filled slots in the channel buffer as tokens, using vacant slots avoids the need to fill the channel buffer after creating it.) Since the cahnnel element type is not important, we'll use struct{}, which has size zero.

crawl2
package main

import (
    "fmt"
    "log"
    "os"

    "gopl.io/ch5/links"
)

//!+sema
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)

func crawl(url string) []string {
    fmt.Println(url)
    tokens <- struct{}{} // acquire a token
    list, err := links.Extract(url)
    <-tokens // release the token

    if err != nil {
        log.Print(err)
    }
    return list
}

//!-sema

//!+
func main() {
    worklist := make(chan []string)
    var n int // number of pending sends to worklist

    // Start with the command-line arguments.
    n++
    go func() { worklist <- os.Args[1:] }()

    // Crawl the web concurrently.
    seen := make(map[string]bool)
    for ; n > 0; n-- {
        list := <-worklist
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                n++
                go func(link string) {
                    worklist <- crawl(link)
                }(link)
            }
        }
    }
}
func main() {
    worklist := make(chan []string)  // lists of URLs, may have duplicates
    unseenLinks := make(chan string) // de-duplicated URLs

    // Add command-line arguments to worklist.
    go func() { worklist <- os.Args[1:] }()

    // Create 20 crawler goroutines to fetch each unseen link.
    for i := 0; i < 20; i++ {
        go func() {
            for link := range unseenLinks {
                foundLinks := crawl(link)
                go func() { worklist <- foundLinks }()
            }
        }()
    }

    // The main goroutine de-duplicates worklist items
    // and sends the unseen ones to the crawlers.
    seen := make(map[string]bool)
    for list := range worklist {
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                unseenLinks <- link
            }
        }
    }
}

8.7.基于select的多路复用

countdown1

func main() {
    fmt.Println("Commencing countdown.")
    tick := time.Tick(1 * time.Second)
    for countdown := 10; countdown > 0; countdown-- {
        fmt.Println(countdown)
        <-tick
    }
    launch()
}

countdown2

package main

import (
    "fmt"
    "os"
    "time"
)

func launch() {
    fmt.Println("Lift off!")
}

func main() {
    abort := make(chan struct{})
    go func() {
        os.Stdin.Read(make([]byte, 1))
        abort <- struct{}{}
    }()
    fmt.Println("Commencing countdown.  Press return to abort.")
    select {
    case <-time.After(10 * time.Second):
        fmt.Println("10s has passed.")
    case <-abort:
        fmt.Println("Launch aborted!")
        return
    }
    launch()
}
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
        case x := <-ch:
            fmt.Println(x)
            time.Sleep(1 * time.Second)
        case ch <- i:
        }
    }
}
package main

import (
    "os"
    "fmt"
    "time"
)

func main() {

    abort := make(chan struct{})
    go func() {
        os.Stdin.Read(make([]byte, 1))
        abort <- struct{}{}
    }()

    fmt.Println("Commencing countdown, Press return")
    tick := time.Tick(1 * time.Second)
    for countdown := 10; countdown >0; countdown-- {
        fmt.Println(countdown)
        select {
        case <-tick:
            fmt.Println("tick")
        case <-abort:
            fmt.Println("Launch aborted!")
            return
        }
    }
    launch()
}

func launch() {
    fmt.Println("Lift off!")
}

Sometimes we want to try to send or receive on a channel but avoid blocking if the channel is not ready -- a non-blocking communication. A select statement can do that too. A select may have a default, which specifies what to do when none of the other communications can proceed immediately.

select {
case <-abort:
    fmt.Printf("Launch aborted!\n")
    return
default:
    // do nothing
}

d
The select statement below receives a value from the abort channel if there is one to receive; otherwise it does nothing. This is a non-blocking receive operration; doing it repeatedly is called polling a channel. (轮询 channel)

The zero value for a channel is nil. Perhaps surprisingly, nil channels are sometimes useful. Because send and receive operations on a nil channel block forever, a case in a select statement whose channel is nil is never selected. This lets us use nil to enable or disable cases that correspond to features like handling timeouts or cancellation, responding to other input events, or emitting output. We'll seen an example in the next section.

8.7. Example: Concurrent directory Traversal

du1

package main

import (
    "flag"
    "fmt"
    "os"
    "path/filepath"
    "io/ioutil"
)

func main() {
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }

    fileSizes := make(chan int64)
    go func() {
        for _, root := range roots {
            walkDir(root, fileSizes)
        }
        close(fileSizes)
    }()

    var nfiles, nbytes int64
    for size := range fileSizes {
        nfiles++
        nbytes += size
    }
    printDiskUsage(nfiles, nbytes)
}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %v\n", err)
        return nil
    }
    return entries
}

The program would be nicer if it kept us informed of its progress. However, simply moving the printDiskUsage call into the loop would cause it to print thousands of lines of output.
The variant of du below prints the totals periodically, but only if the -v flag is specified since not all users will want to see progress messages. The background goroutine that loops over roots remains unchanged. The main goroutine now uses a ticker to generate events every 500ms, and a select statement to wait for either a file size message, in which case it updates the totals, or a tick event, in which case it prints the current totals. If the -v flag is not specified, the tick channel remains nil, and its case in the select is effectively disabled.

du2

package main

// The du2 variant uses select and a time.Ticker
// to print the totals periodically if -v is set.

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "time"
)

//!+
var verbose = flag.Bool("v", false, "show verbose progress messages")

func main() {
    // ...start background goroutine...

    //!-
    // Determine the initial directories.
    flag.Parse()
    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }

    // Traverse the file tree.
    fileSizes := make(chan int64)
    go func() {
        for _, root := range roots {
            walkDir(root, fileSizes)
        }
        close(fileSizes)
    }()

    //!+
    // Print the results periodically.
    var tick <-chan time.Time
    if *verbose {
        tick = time.Tick(500 * time.Millisecond)
    }
    var nfiles, nbytes int64
loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop // fileSizes was closed
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
    printDiskUsage(nfiles, nbytes) // final totals
}

//!-

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

Since the program no longer uses a range loop,the first select case must explicitly test whether the fileSizes channel has been closed, using the two-result form of receive operation. If the channel has been closed, the program breaks out of the loop. The labeled break statement breaks out of both the select and the for loop; an unlabeled break would break out of only the select, causing the loop to begin the next iteration.

However, it still takes too long to finish. There's no reason why all the calls to walkDir can't be done concurrently, thereby exploiting parallelism in the disk system. The third version of du, below, creates a new goroutine for each call to walkDir. It uses a sync.WaitGroup to count the number of calls to walkDir that are still active, and a closer goroutine to close the fileSizes channel when the counter drops to zero.

golang 中的 sync.WaitGroup

先说说WaitGroup的用途:它能够一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。

package main

import (
    "time"
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        fmt.Println(i)
        go func(n int) {
            defer wg.Add(-1)
            number(n)
        }(i)
    }

    wg.Wait()
}

func number(i int) {
    time.Sleep(time.Second)
    fmt.Println(i)
}
package main

import (
    "flag"
    "sync"
    "time"
    "fmt"
    "path/filepath"
    "os"
    "io/ioutil"
)

var vFlag = flag.Bool("v", false, "show verbose progress messages")

func main() {
    flag.Parse()

    roots := flag.Args()
    if len(roots) == 0 {
        roots = []string{"."}
    }

    fileSizes := make(chan int64)
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSizes)
    }
    go func() {
        n.Wait()
        close(fileSizes)
    }()

    var tick <-chan time.Time
    if *vFlag {
        tick = time.Tick(500 * time.Millisecond)
    }
    var nfiles, nbytes int64
loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }

}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

var sema = make(chan struct{}, 20)

func dirents(dir string) []os.FileInfo {
    sema <- struct{}{}
    defer func() { <-sema }()

    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

Since this program creates many thousands of goroutines at its peak, we have to change directs to use a counting semaphore to prevent it from opening too many files at once, just as we did for the web crawler in Section 8.6.

This version runs several time faster than the previous one, though there is a lot of variability from system to system.

Cancellation

Sometimes we need to instruct a goroutine to stop what it is doing, for example, in a web server performing a computation on behalf of a client that has disconnected.
有时候我们需要通知goroutine停止它正在干的事情,比如一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的连接。

There is no way for one goroutine to terminate another directly, since that would leave all its shared variables in undefined states. In the rocket launch program we sent a single value on a channel named abort, which the countdown goroutine interpreted as a request to stop itself. But why if we need to cancel two goroutines, or an arbitrary number?
Go语言并没有提供在一个goroutine中终止另一个goroutine的方法,由于这样会导致goroutine之间的共享变量落在未定义的状态上。在8.7节中的rocket launch程序中,我们往名字叫abort的channel里发送了一个简单的值,在countdown的goroutine中会把这个值理解为自己的退出信号。但是如果我们想要退出两个或者任意多个goroutine怎么办呢?

Recall that after a channel has been closed and drained of all sent values, subsequent receive operations proceed immediately, yielding zero values. We can exploit this to create a broadcast mechanism: don't send a value on the channel, close it.
回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值,操作channel之后的代码可以立即被执行,并且会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播。

package main

import (
    "os"
    "fmt"
    "time"
)

var done = make(chan struct{})

func cancelled() bool {
    select {
    case <-done:
        return true
    default:
        return false
    }
}

func main() {
    go func() {
        os.Stdin.Read(make([]byte, 1))
        close(done)
    }()

    for i := 0; ;i++ {
        if cancelled() {
            return
        }
        fmt.Println(i)
        time.Sleep(time.Second)
    }
}
package main

import (
    "os"
    "sync"
    "time"
    "path/filepath"
    "fmt"
)

var done = make(chan struct{})

func cancelled() bool {
    select {
    case <- done:
        return true
    default:
        return false
    }
}

func main() {
    roots := os.Args[1:]
    if len(roots) == 0 {
        roots = []string{"."}
    }

    go func() {
        os.Stdin.Read(make([]byte, 1))
        close(done)
    }()

    fileSizes := make(chan int64)
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSizes)
    }
    go func() {
        n.Wait()
        close(fileSizes)
    }()
    tick := time.Tick(500 * time.Millisecond)
    var nfiles, nbytes int64
loop:
    for {
        select {
        case <-done:
            for range fileSizes {
            }
            return
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
    printDiskUsage(nfiles, nbytes)
}

func printDiskUsage(nfiles, nbytes int64) {
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    if cancelled() {
        return
    }
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

var sema = make(chan struct{}, 20)

func dirents(dir string) []os.FileInfo {
    select {
    case sema <- struct{}{}:
    case <-done:
        return nil
    }
    defer func() { <-sema }()

    f, err := os.Open(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    defer f.Close()

    entries, err := f.Readdir(0)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
    }
    return entries
}

Now we need to make our goroutines respond to the cancellation. In the main goroutine, we add a third case to the select statement that tries to receive from the done channel. The function returns if this case is ever selected, but before it returns it must first drain the fileSizes channel, discarding all values until the channel is

It might be profitable to poll the cancellation status again within walkDir's loop, to avoid creating goroutines after the cancellation event. Cancellation involves a trade-off; a quicker response often requires more intrusive changes to program logic. Ensuring that no expensive operations ever occur after the cancellation event may require updating many places in your code, but often most of the benefit can be obtained by checking for cancellation in a few import places.

A little profiling of this program revealed that the bottleneck was the acquisition of a semaphore token in directs. The select below makes this operation cancellable and reduces the typical cancellation latency of the program from hundreds of milliseconds to tens:

func dirents(dir string) []os.FileInfo {
    select {
    case sema <- struct{}{}: // acquire token
    case <-done:
        return nil // cancelled
    }
    defer func() { <-sema }() // release token
    // ...read directory...
}

Now, when cancellation occurs, all the background goroutines quickly stop and the main function returns, Of course, when main returns, a program exits, so it can be hard to tell a main function that cleans up after itself from one that does not. There's a handy trick we can use during testing: if instead of returning from main in the event of cancellation, we execute a call to panic, then the runtime will dump the stack of every goroutine in the program. If the main goroutine is the only one left, then it has cleaned up after itself. But if other goroutines remain, they may not have been properly cancelled, or perhaps they have been cancelled but the cancellation takes time; a little investigation may be worthwhile. The panic dup often contains sufficient information to distinguish these cases.

聊天服务(chat Server)

We'll finish this chapter with a chat server that lets several users broadcast textual messages to each other. There are four kinds of goroutine in this program. There is one instance apiece of the main and broadcaster goroutines, and for each client connection there is one handleConn and one clientWriter goroutine. The broadcaster is a good illustration of how select is used, since it has to respond to three different kinds of messages.

The job of the main goroutine, shown below, is to listen for and accept incoming network connections from clients. For each one, it creates a new handleConn goroutine, just as in the concurrent echo server we saw at the start of this chapter.

func main() {
    listener, err := net.Listen("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    go broadcaster()
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err)
            continue
        }
        go handleConn(conn)
    }
}

Next is the broadcaster. Its local variable clients records the current set of connected clients. The only information recorded about each client is the identity of its outgoing message channel, about which more later.
然后是broadcaster的goroutine。他的内部变量clients会记录当前建立连接的客户端集合。其记录的内容是每一个客户端的消息发出channel的"资格"信息。

type client chan<- string

var (
    entering = make(chan client)
    leaving = make(chan client)
    messages = make(chan string)
)

func broadcaster() {
    clients := make(map[client]bool)
    for {
        select {
        case msg := <-messages:
            for cli := range clients {
                cli <- msg
            }

        case cli := <-entering:
            clients[cli] = true

        case cli := <-leaving:
            delete(clients, cli)
            close(cli)
        }
    }
}

The broadcaster listens on the global entering and leaving channels for announcements of arriving and departing clients. When it receives one of these events, it updates the clients set, and if the event was a departure, it closes the client's outgoing message channel. The broadcaster also listens for events on the global messages channel, to which each client sends all its incoming messages. When the broadcaster receives one of these events, it broadcasts the message to every connected client.

Now let's look as the per-client goroutines. The handleConn function creates a new outgoing message channel for its client and announces the arrival of the client of this client to the broadcaster over the entering channel. Then it reads every line of text from the client, sending each line to the broadcaster over the global incoming message channel, prefixing each message with the identity of its sender. Once there is nothing more to read from the client, handleConn announces the departure of the client over the leaving channel and closes the connection.

//定义只读的channel

read_only := make (<-chan int)

 //定义只写的channel

write_only := make (chan<- int)

//可同时读写

read_write := make (chan int)
func handleConn(conn net.Conn) {
    ch := make(chan string) // outgoing client messages
    go clientWriter(conn, ch)

    who := conn.RemoteAddr().String()
    ch <- "You are " + who
    messages <- who + " has arrived"
    entering <- ch

    input := bufio.NewScanner(conn)
    for input.Scan() {
        messages <- who + ": " + input.Text()
    }
    // NOTE: ignoring potential errors from input.Err()

    leaving <- ch
    messages <- who + " has left"
    conn.Close()
}

func clientWriter(conn net.Conn, ch <-chan string) {
    for msg := range ch {
        fmt.Fprintln(conn, msg) // NOTE: ignoring network errors
    }
}
上一篇 下一篇

猜你喜欢

热点阅读