Go select

2021-04-13  本文已影响0人  JunChow520

通过select语句可以监听channel上的数据流动

Golang的select语句类似于UNIX的select()函数的轮询机制,在UNIX中可通过调用select()函数来监控一系列的文件句柄(文件描述符,数量有限),一旦某个文件句柄发生了I/O动作select()调用就会被返回。该机制也被用于高并发的Socket服务器程序中。

select来源于网络I/O模型中的select,本质上I/O多路复用技术,只不过Golang中的select基于的并非网络而是channel

select {
  case communication clause:
    statement(s)
  case communication clause:
    statement(s)
  default:
    statement(s)
}

select语句是Golang中的控制结构,类似用于通信的switch语句,也被称为channel开关。select语句会等待channel准备就绪(收发操作),以便在不同的case下执行。由select开始的一个新的选择块,每个选择条件由case语句来描述。与switch语句相比,select有较多限制,最大的限制在于每个case语句必须是一个I/O操作。

select{
case v1 := <-ch1:
    fmt.Printf("[CH1] received: %v\n", v1)
case v2,ok := <-ch2:
    if ok{
        fmt.Printf("[CH2] received: %v\n", v2)
    }else{
        fmt.Printf("[CH2] closed\n")
    }
case ch3 <-msg:
    fmt.Printf("[CH3] sent: %v\n", msg)
default:
    fmt.Printf("NO Communicating\n")
}

例如:上例select语句拥有四个case子句,前两个是channelreceive接收操作,第三个是channelsend发送操作,最后一个default默认操作。当代码执行到select语句时,case子句会按源代码的顺序进行评估,且只评估一次。评估结果会出现下面几种情况:

  1. default外,若只有一个case评估通过则执行此case中的语句。
  2. default外,若用多个case评估通过则随机挑选一个执行。
  3. default外,所有的case评估都不通过,则执行default
  4. 若没有defaultselect代码块发生阻塞,直到有一个case通过评估,否则一直阻塞。
  5. casereceive接收的是nil则也会发生阻塞

监听

Golang的select用来监听和channel有关的I/O操作,可监听进入channel时的数据,也可以是用channel发送值时。当I/O操作发生时会触发相应地动作,因此每个case都必须是一个I/O操作,确切的说应该是一个面向channel的I/O操作。

例如:定时器

ticker1 := time.NewTicker(time.Second * 1)
ticker2 := time.NewTicker(time.Second * 3)

for{
    select{
    case <-ticker1.C:
        fmt.Printf("[1] TICK\n")
    case <-ticker2.C:
        fmt.Printf("[2] TICK\n")
    }
}

在执行select语句时,运行时系统会自上而下地判断每个case中的发送或接收操作是否可以被立即执行,所谓立即执行即当前goroutine不会因此操作而被阻塞。

select语句只能用于channel的读写操作

例如:使用select来检测channel是否已满

ch := make(chan int, 1)
ch<-1

select{
case ch<-2:
    fmt.Printf("send to channel\n")
default:
    fmt.Printf("channel is full\n")
}

特性

例如:获取斐波拉兹数列

func fib(ch, quit chan int){
    x,y := 0,1
    for{
        select{
        case ch <- x:
            x,y = y, x+y
        case <-quit:
            fmt.Printf("QUIT\n")
            return
        }
    }
}

func main(){
    ch := make(chan int)
    quit := make(chan int)

    go func(){
        for i:=0; i<10; i++{
            fmt.Println(<-ch)
        }
        quit<-0
    }()

    fib(ch, quit)
}

case

select执行过程中必须命中某一case分支,若在遍历所有case后都没有命中,则会进入default分支。若没有default分支则select发生阻塞,直到某个case可以命中。若一直都没有命中,则select抛出deadlock死锁错误。

例如:通过time.After实现定时器,定时任务可通过done channel停止。

done := make(chan bool, 1)
close(done)
for{
    select{
    case <-time.After(time.Second):
        fmt.Printf("Time after\n")
    case <-done:
        //读取零值 false
        fmt.Printf("Read done\n")
    }
}

select会随机执行一个可运行的case,若没有case可以运行则阻塞,直到有case可以运行。

select可用于多个channel进行读写操作时仅需一次只处理一个的情况。

ch := make(chan int, 1024)
go func(ch chan int){
    for{
        v := <-ch
        fmt.Printf("value = %v\n", v)
    }
}(ch)

ticker := time.NewTicker(time.Second * 1)
for i:=0; i<5; i++{
    select{
    case ch<-i:
    case <-ticker.C:
        fmt.Printf("%d: Ticker\n", i)
    }
    time.Sleep(time.Microsecond * 500)
}

close(ch)
ticker.Stop()

ticker.Cch同时满足读写条件时,select会随机地选择一个来执行,导致看起来一些数据丢了。

var ch chan int

go func(ch chan int){
    ch <- 100
}(ch)

select {
case <-ch:
    fmt.Printf("Channel recieved\n")
}

发生错误:fatal error: all goroutines are asleep - deadlock!

default

select语句会被阻塞,直到其中一个case被执行。若select中没有任何case,它将永远阻塞,从而导致死锁。

例如:空select{}引发死锁

func main(){
    select {
    
    }
}

对于空的select语句,程序会被阻塞,准确来说是当前goroutine会被阻塞。Golang自带死锁检测机制,发现当前goroutine再也没有机会被唤醒时,则会panic

fatal error: all goroutines are asleep - deadlock!

通过带defaultselect实现非阻塞读写,用于防止select发生阻塞。

select{
  default:
}

多个case运行时select会随机公平地选出一个执行,其它不会执行。若存在default子句则会执行该语句。若没有defaultselect阻塞,直到某个通信可以运行。Go不会重新对channel或值进行求值。

select语句永远阻塞,没有其它goroutine写入此channel时,将导致死锁。

ch := make(chan int)
select{
case <-ch:
}
fatal error: all goroutines are asleep - deadlock!

若存在默认情况default则不会发生死锁deadlock,因为在没有其它case准备就绪时将执行default默认情况。

ch := make(chan int)
select{
case <-ch:
default:
    fmt.Println("default case executed")
}

例如:典型生产者消费者模式

func main(){
    ch1 := make(chan int)
    ch2 := make(chan int)
    //生产者
    go pump1(ch1)
    go pump2(ch2)
    //消费者
    go suck(ch1, ch2)

    time.Sleep(1e9)
}
func pump1(ch chan int){
    for i:=0; ; i++{
        ch <- i * 1
    }
}
func pump2(ch chan int){
    for i:=0; ; i++{
        ch <- i * 2
    }
}
func suck(ch1,ch2 chan int){
    for{
        select{
        case v := <-ch1:
            fmt.Printf("[CH1] receive %d\n", v)
        case v := <-ch2:
            fmt.Printf("[CH2] receive %d\n", v)
        default:
            fmt.Printf("NO Communicating\n")
        }
    }
}

选择select的哪一个case取决于哪个信道接收到了消息。

timeout

case中的channel始终没有接收到数据,同时也没有提供default语句时,select语句整体会发生阻塞。有时并不希望select一直阻塞下去,此时可手动设置一个超时时间。

func expire(ch chan bool, t int){
    time.Sleep(time.Second * time.Duration(t))
    ch <- true
}

func main(){
    timeout := make(chan bool, 1)
    go expire(timeout, 2)

    ch1 := make(chan string, 1)
    ch2 := make(chan string, 1)
    select{
    case msg1 := <-ch1:
        fmt.Printf("[CH1] received: %s\n", msg1)
    case msg2 := <-ch2:
        fmt.Printf("[CH2] received: %s\n", msg2)
    case <-timeout:
        fmt.Printf("[EXPIRE] exit\n")
    }
}

例如:使用select实现channel的读取超时机制,不能使用default否则3秒超时未到,就会直接执行default

timeout := make(chan bool, 1)

go func(){
    time.Sleep(time.Second * 3)
    timeout <- true
}()

ch := make(chan int)
select{
case <-ch:
case <-timeout:
    fmt.Printf("TIMEOUT\n")
}

可使用time.After实现超时控制

ch := make(chan int)

select{
case <-ch:
    fmt.Printf("read from ch\n")
case <-time.After(time.Second * time.Duration(3)):
    fmt.Printf("[TIMEOUT] exit\n")
}

for

forselect结合时,break是无法跳出for之外的,若需break出来需添加标签使用goto,或break到具体为止。

ch := make(chan bool)
for i:=0; i<runtime.NumCPU(); i++{
    go func(){
        for{
            select{
            case <-ch:
                break
            default:
            }
        }
    }()
}

time.Sleep(time.Second * 10)
for i:=0; i<runtime.NumCPU(); i++{
    ch<-true
}

一般来说,使用select监听各个case的I/O事件,每个case都是阻塞的。上例中原本希望select在获取到ch里的数据时立即退出循环,但由于在for循环中,第一次读取ch后仅仅退出了select但并未退出for,因此下次哈希继续执行select逻辑,此时将永远是执行default,直到ch里读取到数据。否则会一直在一个死循环中运行,因此即便只是放到一个goroutine中运行,也会占满所有的CPU。解决的方式直接把default拿掉,这样select会一直阻塞在ch通道的I/O上,当ch有数据时就可以随时响应通道中的信息。

select实现了一种监听模式,通常用在(无限)循环中,在某中情况下可通过break语句使循环退出。

ch := make(chan int)
//定时2s
ticker := time.NewTicker(time.Second * 2)
defer ticker.Stop()
//发送信号
go func(ch chan int){
    time.Sleep(time.Second * 5)
    ch <- 1
}(ch)
//监听I/O
for{
    select{
    case <-ticker.C:
        fmt.Printf("task running...\n")
    case result,ok := <-ch:{
        if ok{
            fmt.Printf("chan number is %v\n", result)
            break
        }
    }
    }
}
fmt.Printf("END\n")

例如:

ch := make(chan int)
quit := make(chan bool)

//写数据
go func() {
    //循环写入
    for i := 0; i < 5; i++ {
        ch <- i
        time.Sleep(time.Second)
    }
    //关闭channel
    close(ch)
    //通知主goroutine推出
    quit <- true
    //退出当前goroutine
    runtime.Goexit()
}()

//主goroutine 读数据
for {
    select {
    case num := <-ch:
        fmt.Printf("received number is %d\n", num)
    case <-quit:
        fmt.Printf("quit\n")
        //break //跳出select
        return //终止进程
    }
    fmt.Printf("==================\n")
}

多路复用

select是Golang在语言层面提供的多路I/O复用机制,它可检测多个channel是否ready(是否可读或可写)。

select是如何实现多路复用的,为什么没有在第一个channel操作时阻塞,从而导致后面的case都执行不了。

上一篇下一篇

猜你喜欢

热点阅读