如何优雅的关闭channel

2019-03-15  本文已影响0人  golang推广大使

channel关闭的原则

一个大的原则就是不要在接受者这边关闭channel,也不要关闭一个有多个发送者的channel。换句话说,我们只应该在只有一个发送者的goroutine中关闭这个只有一个发送者的channel。
如果我们可以保证不再有goroutine关闭(或发送值)非封闭的非零通道,那么goroutine可以安全地关闭通道。然而,由接收者或通道的许多发送者之一做出这样的保证通常需要很多努力,并且经常使代码变得复杂。

野蛮的关闭channel的方案

如果你需要在receiver这边或者在多个发送者中的一个这边关闭channel。你可以使用recover机制来阻止panic。下面是一个例子:

func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            // The return result can be altered
            // in a defer function call.
            justClosed = false
        }
    }()

    // assume ch != nil here.
    close(ch)   // panic if ch is closed
    return true // <=> justClosed = true; return
}

上面的方案很野蛮,但是也适用于发送给一个可能关闭的channel

func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
        if recover() != nil {
            closed = true
        }
    }()

    ch <- value  // panic if ch is closed
    return false // <=> closed = false; return
}

轻轻的的关闭channel的方案

使用sync.Once关闭channel

type MyChannel struct {
    C    chan T
    once sync.Once
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.once.Do(func() {
        close(mc.C)
    })
}

当然,我们也可以使用sync.Mutex来避免多次关闭channel

type MyChannel struct {
    C      chan T
    closed bool
    mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    if !mc.closed {
        close(mc.C)
        mc.closed = true
    }
}

func (mc *MyChannel) IsClosed() bool {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    return mc.closed
}

优雅的关闭channel的方案

一个接受者,n个发送者,接受者通过关闭另外一个channel来通知发送者停止发送

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)

    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel dataCh.
        // Its receivers are the senders of channel dataCh.

    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                // The try-receive operation is to try to exit
                // the goroutine as early as possible. For this
                // specified example, it is not essential.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first branch in the
                // second select may be still not selected for some
                // loops if the send to dataCh is also unblocked.
                // But this is acceptable for this example, so the
                // first select block above can be omitted.
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(MaxRandomNumber):
                }
            }
        }()
    }

    // the receiver
    go func() {
        defer wgReceivers.Done()

        for value := range dataCh {
            if value == MaxRandomNumber-1 {
                // The receiver of the dataCh channel is
                // also the sender of the stopCh channel.
                // It is safe to close the stop channel here.
                close(stopCh)
                return
            }

            log.Println(value)
        }
    }()

    // ...
    wgReceivers.Wait()
}

如评论中所述,对于附加信号通道,其发送方是数据通道的接收方。附加信号通道由其唯一的发送器关闭,该发送器保持通道关闭原理。
在此示例中,通道数据Ch永远不会关闭。是的,channel不必关闭。如果没有goroutines再次引用它,无论是否关闭,channel最终都会被垃圾收集。因此,关闭channel的优雅不是关闭channel。

m个接受者,n个发送者,他们之中任何一个通过关闭一个另外的channel来结束游戏。

这是最复杂的情​​况。我们不能让任何接收器和发送器关闭数据通道。并且我们不能让任何接收器关闭附加信号通道以通知所有发送者和接收者退出游戏。但是,我们可以引入一个主持人角色来关闭附加信号通道。以下示例中的一个技巧是如何使用try-send操作来通知主持人关闭附加信号通道。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown below.
        // Its receivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
        // The channel toStop is used to notify the moderator
        // to close the additional signal channel (stopCh).
        // Its senders are any senders and receivers of dataCh.
        // Its receiver is the moderator goroutine shown below.
        // It must be a buffered channel.

    var stoppedBy string

    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(MaxRandomNumber)
                if value == 0 {
                    // Here, the try-send operation is to notify the
                    // moderator to close the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }

                // The try-receive operation here is to try to exit the
                // sender goroutine as early as possible. Try-receive
                // try-send select blocks are specially optimized by the
                // standard Go compiler, so they are very efficient.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first branch in this
                // select block may be still not selected for some
                // loops (and for ever in theory) if the send to dataCh
                // is also non-blocking. If this is not acceptable,
                // then the above try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()

            for {
                // Same as the sender goroutine, the try-receive
                // operation here is to try to exit the receiver
                // goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first branch in this
                // select block may be still not selected for some
                // loops (and for ever in theory) if the receive from
                // dataCh is also non-blocking. If this is not acceptable,
                // then the above try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == MaxRandomNumber-1 {
                        // The same trick is used to notify
                        // the moderator to close the
                        // additional signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }

                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }

    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

在此示例中,仍保留通道关闭原则。
请注意,通道toStop的缓冲区大小(容量)是1。这是为了避免在主持人goroutine准备好接收来自toStop的通知之前发送第一个通知。

上一篇下一篇

猜你喜欢

热点阅读