Go基础系列随笔-生活工作点滴Go语言

15 Go并发编程(二):管道 —— Go并发的通信机制

2019-07-06  本文已影响22人  GoFuncChan

Go管道

1.什么是管道?

管道最早由CSP模型提出,以点对点管道代替内存共享实现并发进程间的数据交互,相比内存共享数据交互的相率要高很多。关于CSP模型在《Go并发编程初探》篇章已提到,这里不再赘述。

在Go中,管道是一种特殊类型,用chan关键字表示。它的主要作用是在协程之间实现通信,可以说是go协程间的高速公路。

管道声明:

var {变量名} chan {数据类型}

//声明一个整型管道chA
var chA chan int

管道创建:

{接收变量名} := make(chan {数据类型},{管道容量})

//使用make函数创建一个容量为5的整型管道
chB := make(chan int,5)

管道有几个特点:

1.管道是有类型的,基于go的基本数据类型
2.管道是有方向的,可从管道读出数据,可向管道写入数据。当管道被传递时可设置其读写方向
3.管道是有缓存的,通过缓存容量可控制协程间的阻塞
4.管道是可关闭的,且只能被关闭一次,管道也是资源,如果不使用应关闭

管道使用常识:

1.管道数据读写,管道写入数据后必须关闭;
2.从一个已经关闭的管道中读取数据,读完了之后,继续读会读到其管道类型的零值;
3.没有初始化的管道被关闭会报panic;
4.读取管道数据时应校验其有效性;
5.关闭管道时会产出一个广播,所有从管道读取数据的协程都会收到消息;
6.被遍历的管道如果没有收到管道被关闭的广播,遍历会一直被阻塞;
7.管道一般在写入协程处调用关闭,写只有一个写入协程的情况,管道被关闭后不能写入数据,但其他协程可以读出,注意重复关闭的情况;

以下演示管道在协程间的通信作用:

func BaseChannel01() {
    //声明一个管道:声明后没有初始化的管道是空指针nil,证明其为引用类型
    //PS:go的引用类型有五种:slice、map、chan、指针、接口,前三种都是直接可以用make()函数创建
    //var chA chan int
    //fmt.Println(chA)

    //初始化一个管道
    chB0 := make(chan int, 0)    //无缓存能力的整型管道
    chB1 := make(chan string, 5) //缓存能力为5的字符串管道
    chB2 := make(chan<- int, 3)  //缓存能力为3的整型只写管道
    chB3 := make(<-chan int, 3)  //缓存能力为3的整型只读管道
    fmt.Printf("chB0:类型%T,值%v\n", chB0, chB0)
    fmt.Printf("chB1:类型%T,值%v\n", chB1, chB1)
    fmt.Printf("chB2:类型%T,值%v\n", chB2, chB2)
    fmt.Printf("chB3:类型%T,值%v\n", chB3, chB3)

    //协程从通道不断的写入读出数据
    //开辟一个协程往通道里写数据
    go func(ch chan int) {
        //循环不断往通道里写入当前秒数
        for {
            now := time.Now()
            second := now.Second()
            ch <- second
        }

    }(chB0)

    //另一个协程不断从通道读出数据
    go func(ch chan int) {
        for {
            nowSecond := <-ch
            fmt.Println("子协程读出数据:", nowSecond)
            time.Sleep(time.Second)
        }
    }(chB0)

    //父协程不断从通道读出数据
    for {
        nowSecond := <-chB0
        fmt.Println("父协程读出数据:", nowSecond)
        time.Sleep(time.Second)
    }

    //读取管道数据时应校验其有效性
    fmt.Println("读取管道数据时校验其有效性:")
    chCheck := make(chan int, 3)

    chCheck <- 123
    chCheck <- 456
    chCheck <- 789
    close(chCheck)

    go func(ch chan int) {
        chc1 := <-chCheck
        chc2 := <-chCheck
        chc3 := <-chCheck
        chc4 := <-chCheck
        chc5 := <-chCheck

        fmt.Println(chc1, "-", chc2, "-", chc3, "-", chc4, "-", chc5)

        chc6, ok := <-chCheck
        fmt.Printf("chc5:值%v,有效性:%v\n", chc6, ok)

    }(chCheck)

    time.Sleep(time.Second * 3)

    //遍历管道,管道关闭以后遍历读取会自动被通知退出
    for v := range chCheck{
        time.Sleep(time.Second)
        fmt.Println(v)
    }

}
2.管道的读写和异常

关闭管道的几个注意事项:

func BaseChanner02() {
    var ch1 chan int
    close(ch1) // panic: close of nil channel

    ints1 := make(chan int, 1)
    ints1 <- 111
    close(ints1)
    close(ints1) //panic: close of closed channel]

    ints := make(chan int, 1)
    ints <- 111
    close(ints)
    ints <- 111 //panic: send on closed channel
}
3.无缓存的管道

使用一个无缓存的管道时应该注意,它是阻塞的:

func BaseChanner03() {
    chInt := make(chan int)

    go func(ch chan int) {
        fmt.Println("启动协程1")
        ch <- 111
        close(ch)
        fmt.Println("结束协程1")
    }(chInt)

    go func(ch chan int) {
        fmt.Println("启动协程2")
        fmt.Println("管道数据:", <-ch)
        fmt.Println("结束协程2")
    }(chInt)

    for {
        time.Sleep(time.Second)
    }

}
4.有缓存能力的管道

可利用管道缓存能力进行协程调度,管道的元素个数或称缓存能力,决定协程是否产生阻塞,若管道数据已满则阻塞,写入阻塞读出也阻塞,这是相互的。

func BaseChanner04() {
    //创建一个缓存能力为3的整型管道
    chInt := make(chan int, 3)

    go func(ch chan int) {
        fmt.Println("启动协程1")
        for i := 1; i <= 5; i++ {
            time.Sleep(time.Second * 2)
            fmt.Println("协程1写入数据:", i)
            ch <- i
        }
        fmt.Println("结束协程1")
    }(chInt)

    go func(ch chan int) {
        fmt.Println("启动协程2")
        for i := 1; i <= 5; i++ {
            num := <-ch
            fmt.Println("协程2读出数据:", num)

        }
        fmt.Println("结束协程2")
    }(chInt)

    for {
        time.Sleep(time.Second * 3)
    }

}
5.select选择管道,协程多路复用

在讲到Go外壳:分支专题是提到select,select关键字是go特有的,其主要用于配合管道实现多路复用。

func BaseChanner05() {
    //创建三个管道
    ch1 := make(chan int, 3)
    ch2 := make(chan int, 4)
    ch3 := make(chan int, 5)

    //创建3条协程
    go func(c chan int) {
        ticker := time.NewTicker(time.Second * 1)
        for {
            <-ticker.C
            c <- 1
        }
    }(ch1)
    go func(c chan int) {
        ticker := time.NewTicker(time.Second * 2)
        for {
            <-ticker.C
            c <- 2
        }
    }(ch2)
    go func(c chan int) {
        ticker := time.NewTicker(time.Second * 3)
        for {
            <-ticker.C
            c <- 3
        }
    }(ch3)

    //time.Sleep(time.Second)
    //主协程select多路复用,for不断获取不同管道的数据,随先来则优先处理谁。
    for {
        select {
        case chV1, ok := <-ch1:
            fmt.Printf("管道ch1输出:%v,有效性%v\n", chV1, ok)
        case chV2, ok := <-ch2:
            fmt.Printf("管道ch2输出:%v,有效性%v\n", chV2, ok)
        case chV3, ok := <-ch3:
            fmt.Printf("管道ch3输出:%v,有效性%v\n", chV3, ok)
    }
}
6.通过容量控制并发数

利用有容量管道的阻塞能力——地铁闸机模型

func BaseChanner06() {
    //创建一个容量为5的管道,无论协程开多少,控制每次5条并发
    semaphore := make(chan int, 5)

    for i := 1; i <= 100; i++ {
        go func(c chan int, n int) {
            for {
                c <- i //抢通道写入,抢不到则阻塞
                fmt.Println("协程", n, "抢到通道")
                time.Sleep(time.Second)
                <-c //做完操作后自己读出,空出容量
            }
        }(semaphore, i)
    }

    for {
        time.Sleep(time.Second)
    }

}
7.定时器

固定时间和周期时长定时器,其与time.sleep()区别是可以终止和重置定时器。下面演示一下timer和ticker,以及在子协程中终止ticker

func BaseChannel07() {
    //使用固定时间定时
    timer := time.NewTimer(time.Second * 3)
    <-timer.C
    fmt.Println("父协程定时3秒输出!!!")
    
    //简单的使用变量标识ticker状态
    var tickerStopped = false
    //使用周期定时器
    ticker := time.NewTicker(time.Second * 1)
    go func(t *time.Ticker) {
        //5秒后终止周期器
        //使用固定时间定时
        timer := time.NewTimer(time.Second * 5)
        <-timer.C
        fmt.Println("子协程定时5秒关闭周期器!!!")
        t.Stop()
        tickerStopped = true
        runtime.Goexit()
    }(ticker)

    for {
        if !tickerStopped {
            s := <-ticker.C
            fmt.Println("父协程每隔1秒输出!!!", s)
        } else {
            fmt.Println("子协程已关闭周期定时器!!!")
            os.Exit(0)
        }

    }

    //不可撤销的time.Sleep()

}
8.如何优雅的关闭管道

比较优雅的方式一般建议在发送方关闭。

协程对管道的操作分几种情况:

在确认只有一个发送者的情况下,管道关闭时比较简单的,写入完成后记得立即或延迟关闭管道即可,接收者在读完所有数据后会校验读取数据的有效性。

这种情况与上一种类似,如果其他多个协程的接受者在遍历读取,它们都会收到管道被关闭的广播,并退出退出遍历阻塞状态。

这种情况比较复杂,各个发送者都不能粗暴的关闭管道,虽然可以通过sync.Once控制只关闭一次,但其他发送者仍有可能向一个已被关闭的管道发送数据。这里可能需要其他状态量标志各个发送者的完成状态,接收者监控该状态量确认各发送者写入完成后,由接收者关闭管道。可以使用等待组、状态变量、或其他通道等等。

一般这种状况较少,多个协程一起对同一个通道进行读写,一般都需要一个管理者监控该通道的读写完成情况,如单独一个协程检测各个操作该通道的协程的完成状态,当全部读写完成后再进行管道关闭。

上一篇下一篇

猜你喜欢

热点阅读