go并发编程范式

2018-08-11  本文已影响0人  DDY26

1. 访问范围约束

通过限制访问约束,减少不必要的同步带来的性能损耗。例如,集中控制channel的写入,对外提供channel的读取,这样本身便提供了对并发安全的支持。

// channel拥有者具有写入权限
chanOwner := func() <-chan int {
    results := make(chan int, 5) //1
    go func() {
        defer close(results)
        for i := 0; i <= 5; i++ {
            results <- i
        }
    }()
    return results
}
// 消费者只具备读取权限
consumer := func(results <-chan int) { //3
    for result := range results {
        fmt.Printf("Received: %d\n", result)
    }
    fmt.Println("Done receiving!")
}

results := chanOwner() //2
consumer(results)

// 对于共享数据的不同数据段的并发访问同样是安全的
printData := func(wg *sync.WaitGroup, data []byte) {
    defer wg.Done()

    var buff bytes.Buffer
    for _, b := range data {
        fmt.Fprintf(&buff, "%c", b)
    }
    fmt.Println(buff.String())
}

var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3]) // 1
go printData(&wg, data[3:]) // 2

wg.Wait()

2. for-select

for-select循环模式如下所示:

for { // 无限循环或遍历
    select {
    // 对通道进行操作
    }
}

常见的几种for-select循环的用法

a. 在通道上发送迭代变量

for _, s := range []string{"a", "b", "c"} {
    select {
    case <-done:
        return
    case stringStream <- s:   // slice数据循环迭代写入channel
    }
}

b. 无限循环等待停止

// 第一种方式
for {
    select {
    case <-done: 
        return   // 停止返回
        default:
    }
    // 执行非抢占任务
}

// 
for {
    select {
    case <-done:
        return 
        default:    // 将要执行的任务放入default分支中
                // 执行非抢占任务
    }
}

3. goroutine泄露

goroutine几种终止方式:

一个常见的goroutine泄露的例子:

doWork := func(strings <-chan string) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)
        for s := range strings {  // 对于channel的访问,将一直被阻塞
            fmt.Println(s)
        }
    }()
    return completed
}

doWork(nil)
// 这里还有其他任务执行
fmt.Println("Done.")

解决goroutine泄露的一种方法,即向子goroutine发送结束信号,通知其退出。

doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} { //1
    terminated := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(terminated)
        for {   // for-select 处理一手终止信号
            select {
            case s := <-strings:  // 该case分支将一直被阻塞
                // Do something interesting
                fmt.Println(s)
            case <-done: //2 :接收到结束信号,退出当前goroutine
                return
            }
        }
    }()
    return terminated
}

done := make(chan interface{})
terminated := doWork(done, nil)

go func() { //3
    // Cancel the operation after 1 second.
        // 1s后close channel,向子goroutine广播结束信号
    time.Sleep(1 * time.Second)
    fmt.Println("Canceling doWork goroutine...")
    close(done)
}()

// 一直阻塞,直到子goroutine结束
<-terminated //4
fmt.Println("Done.")

另外一个goroutine泄露的示例:

newRandStream := func() <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.") // 1
        defer close(randStream)
        for {
            randStream <- rand.Int()  // 此处在读取完第三个元素后,将会永久阻塞,导致goroutine泄露
        }
    }()

    return randStream
}

randStream := newRandStream()
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}

针对该goroutine泄露的解决方案:

newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)

        for {
            select {
            case randStream <- rand.Int():
            case <-done:   // 结束信号到达,立即结束
                return
            }
        }

    }()

    return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}

close(done)  // close channel,发出通知信号
//模拟正在进行的工作
time.Sleep(1 * time.Second)

防止goroutine泄露遵循的一个原则:如果goroutine负责创建子goroutine,它也必须负责确保它可以停止子goroutine

4. or-channel

or-done-channel:将任意个数的done channel组合成为一个done channel,即N个done channel中任意一个done了,整个组合的done channnel就done了。

// or-channel的一种递归实现
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    var or func(chs ...<-chan interface{}) <-chan interface{}
    or = func(chs ...<-chan interface{}) <-chan interface{} {

        if len(chs) == 0 {
            return nil
        }

        if len(chs) == 1 {
            return chs[0]
        }

        chsLen := len(chs)
        orDone := make(chan interface{}) // done channel
        go func() {
            defer close(orDone)
            select {
            case <-or(chs[:chsLen/2]...): // 0...chsLen/2-1 channel监听
            case <-or(chs[chsLen/2:]...): // chsLen/2...chsLen-1 channel监听
            }
        }()
        return orDone
    }

    var chs []chan interface{}
    for i := 0; i < 5; i++ {
        chs = append(chs, make(chan interface{}))
    }

    go func(chs ...chan interface{}) {
        time.Sleep(1 * time.Second)
        idx := rand.Intn(5)
        fmt.Printf("close channel %d\n", idx)
        close(chs[idx])
    }(chs...)

    //<-or(chs...)
    <-or(chs[0], chs[1], chs[2], chs[3], chs[4])
    fmt.Println("end test")
}

Tips:
Go的一个优点是能够快速创建,调度和运行goroutine,并且在Go中积极鼓励使用goroutines来正确建模问题。

5. 错误处理

Go避开了流行的错误异常模型,Go认为错误处理非常重要,并且在开发程序时,我们应该像关注算法一样关注错误处理。

// 并发情况下的错误处理,潜在的结果和潜在的错误同时返回
type Result struct { //1
    Error    error
    Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result { //2

    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            result = Result{Error: err, Response: resp} //3:错误和结果包装在一起
            select {
            case <-done:
                return
            case results <- result: //4
            }
        }
    }()

    return results
}
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.baidu.com", "https://badhost"}
for result := range checkStatus(done, urls...) {  // 检查错误和结果
    if result.Error != nil { //5
        fmt.Printf("error: %v", result.Error)
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

6. 管道

利用channel来实现管道的功能:

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i * multiplier:  // 乘法结果塞入管道中
            }
        }
    }()

    return multipliedStream
}

add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i + additive:  // 加法结果塞入管道中
            }
        }
    }()
    return addedStream
}

done := make(chan interface{})
defer close(done)

intStream := generator(done, 1, 2, 3, 4)  // 生产数据
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) // 管道传输数据
 
// 通过关闭done channel,随时终止管道数据的传输
for v := range pipeline {
    fmt.Println(v)
}

利用channel实现的一些generator:

// repeat 会重复传输你给它的值,知道关闭done channel
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

// take: 从channel中拿取一定数量的数据,然后返回
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {

    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:  // 从传入的管道中接收数据
            }
        }
    }()
    return takeStream
}

// 模拟管道两端的生产方和消费方
done := make(chan interface{})
defer close(done)

for num := range take(done, repeat(done, 1), 10) {
    fmt.Printf("%v ", num)
}

一个指定函数规则的generator:

// fn即为函数产生器
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

done := make(chan interface{})
defer close(done)
// 随机数生成器,用于产生随机数
rand := func() interface{} {
    return rand.Int()
}

for num := range take(done, repeatFn(done, rand), 10) {
    fmt.Println(num)
}

7. 扇入/扇出

单个goroutine处理管道的输入/输出变成多个goroutine处理管道的输入/输出,称之为扇入/扇出。
扇出:描述启动多个goroutine以处理来自管道的输入过程。
扇入:描述将多个goroutine的处理结果组合到一个通道中。

启用扇入/扇出操作的时机:

相关扇入/扇出的示例可参考:https://www.kancloud.cn/mutouzhang/go/596844

可以创建一个function来同时处理close done channel和数据流接收操作

orDone := func(done, c <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:  // 处理done channel
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:  // 数据接收处理
                case <-done:
                                        // return 
                }
            }
        }
    }()

    return valStream
}

// 从返回的channel中读取数据,可随时中断数据的读取,close done channel
for val := range orDone(done, myChan) {
    // Do something with val
}

8. tee-channel

// or-done channel + bridge-channel
orDone := func(done, c <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:  // 处理done channel
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:  // 数据接收处理
                case <-done:
                                        // return 
                }
            }
        }
    }()

    return valStream
}

// 从返回的channel中读取数据,可随时中断数据的读取,close done channel
for val := range orDone(done, myChan) {
    // Do something with val
}

// bridge-channel : channel套channel
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{}) // 1
    go func() {
        defer close(valStream)
        for { // 2
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            for val := range orDone(done, stream) { // 3
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

9. context package

context package提供的一些操作方法如下所示:

var Canceled = errors.New("context canceled")
var Canceled = errors.New("context canceled")

type CancelFunc  // 取消函数
type Context

func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

Context interface定义:

type Context interface {
        // Deadline 返回任务完成时(该 context 被取消)的时间。
        // 如果deadline 未设置,则返回的ok值为false。
        // 连续调用该函数将返回相同的结果。
        Deadline() (deadline time.Time, ok bool)

        // Done 返回任务完成时(该 context 被取消)一个已关闭的通道。
        // 如果该context无法被取消,Done 将返回nil。
        // 连续调用该函数将返回相同的结果。
        //
        // 当cancel被调用时,WithCancel 遍历 Done以执行关闭;
        // 当deadline即将到期时,WithDeadline 遍历 Done以执行关闭;
        // 当timeout时,WithTimeout 遍历 Done以执行关闭。
        //
        // Done 主要被用于 select 语句:
        //
        //  // Stream 使用DoSomething生成值,并将值发送出去
        //  // 直到 DoSomething 返回错误或 ctx.Done 被关闭
        //  func Stream(ctx context.Context, out chan<- Value) error {
        //      for {
        //          v, err := DoSomething(ctx)
        //          if err != nil {
        //              return err
        //          }
        //          select {
        //          case <-ctx.Done():
        //              return ctx.Err()
        //          case out <- v:
        //          }
        //      }
        //  }
        //
        // 查看 https://blog.golang.org/pipelines更多示例以了解如何使用
        // Done通道执行取消操作。
        Done() <-chan struct{}

        // 如果 Done 尚未关闭, Err 返回 nil.
        // 如果 Done 已关闭, Err 返回值不为nil的error以解释为何关闭:
        // 因 context 的关闭导致
        // 或 context 的 deadline 执行导致。
        // 在 Err 返回值不为nil的error之后, 连续调用该函数将返回相同的结果。
        Err() error

        // Value 根据 key 返回与 context 相关的结果,
        // 如果没有与key对应的结果,则返回nil。
        // 连续调用该函数将返回相同的结果。
        //
        // 该方法仅用于传输进程和API边界的请求数据,
        // 不可用于将可选参数传递给函数。
        //
        // 键标识着上Context中的特定值。
        // 在Context中存储值的函数通常在全局变量中分配一个键,
        // 然后使用该键作为context.WithValue和Context.Value的参数。
        // 键可以是系统支持的任何类型;
        // 程序中各包应将键定义为未导出类型以避免冲突。
        //
        // 定义Context键的程序包应该为使用该键存储的值提供类型安全的访问器:
        //
        //  // user包 定义了一个User类型,该类型存储在Context中。
        //  package user
        //
        //  import "context"
        //
        //  // User 类型的值会存储在 Context中。
        //  type User struct {...}
        //
        //  // key是位于包内的非导出类型。
        //  // 这可以防止与其他包中定义的键的冲突。
        //  type key int
        //
        //  // userKey 是user.User类型的值存储在Contexts中的键。
        //  // 它是非导出的; clients use user.NewContext and user.FromContext
        //  // 使用 user.NewContext 和 user.FromContext来替代直接使用键。
        //  var userKey key
        //
        //  // NewContext 返回一个新的含有值 u 的 Context。
        //  func NewContext(ctx context.Context, u *User) context.Context {
        //      return context.WithValue(ctx, userKey, u)
        //  }
        //
        //  // FromContext 返回存储在 ctx中的 User类型的值(如果存在的话)。
        //  func FromContext(ctx context.Context) (*User, bool) {
        //      u, ok := ctx.Value(userKey).(*User)
        //      return u, ok
        //  }
        Value(key interface{}) interface{}

一些终止信号的函数:

// 取消
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// deadline
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
// 超时
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

其中,
WithCancel返回一个新的Context,它在调用返回的cancel函数时关闭done通道。
WithDeadline返回一个新的Context,当机器的时钟超过给定的最后期限时,它关闭done通道。
WithTimeout返回一个新的Context,它在给定的超时时间后关闭done通道。

具体一些context包的使用示例可参考:https://www.kancloud.cn/mutouzhang/go/596849

上一篇下一篇

猜你喜欢

热点阅读