golang实现协程池

2017-02-16  本文已影响0人  柔软的胖

golang中启动一个协程不会消耗太多资源,有人认为可以不用协程池。但是当访问量增大时,可能造成内存消耗完,程序崩溃。于是写了一个协程池的Demo。

Demo中有worker和job。worker是一个协程,在worker中完成一个job。Jobs是一个channel,使用Jobs记录job。当生成一个新任务,就发送到Jobs中。程序启动时,首先启动3个worker协程,每个协程都尝试从Jobs中接收job。如果Jobs中没有job,worker协程就等待。

基本逻辑如下:

  1. Jobs管道存放job,Results管道存放结果。
  2. 程序一启动,启动3个worker协程,等待从Jobs管道中取数据。
  3. 向Jobs管道中发送3个数据。
  4. 关闭Jobs管道。
  5. worker协程从Jobs管道中接收到数据以后,执行程序,把结果放到Results管道中。然后继续等待。
  6. 当Jobs管道中没有数据,并且Results有3个数据时。退出主程序。

代码如下:

package main

import (
    "fmt"
    "time"
)

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                fmt.Println("worker", id, "started  job", j)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j)
                Results <- true
            }
        }
    }()
}

const channelLength = 3

var (
    Jobs chan int
    Results chan bool
)

func main() {
    Jobs = make(chan int, channelLength)
    Results = make(chan bool, channelLength)

    // Start worker goroutines
    for i:= 0; i < channelLength; i++ {
        worker(i)
    }

    // Send to channel
    time.Sleep(time.Second)
    for j := 0; j < channelLength; j++ {
        Jobs <- j
    }
    close(Jobs)

    for len(Jobs) != 0 || len(Results) != channelLength  {
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Println("Complete main")
}

运行结果如下:

Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started  job 2
worker 2 started  job 0
worker 0 started  job 1
worker 0 finished job 1
Waiting for job...
worker 0 started  job 0
worker 2 finished job 0
Waiting for job...
worker 2 started  job 0
worker 1 finished job 2
Waiting for job...
worker 1 started  job 0
Complete main

这个程序出现问题了,bug在哪里?

开始的3次,协程运行都是正常。

worker 1 started  job 2
worker 2 started  job 0
worker 0 started  job 1
worker 0 finished job 1
worker 2 finished job 0
worker 1 finished job 2

根据设计,向Jobs管道中发送3个数据以后,就关闭了管道。此后,协程不应该再从Jobs管道中接收到数据。

for j := 0; j < channelLength; j++ {
        jobs <- j
    }
close(jobs)

实际运行中,协程接收完3个数据以后,worker还能不断的从Jobs管道中接收到数据。与设计不符。

worker 0 started  job 0
worker 2 started  job 0
worker 1 started  job 0

开始以为问题出在worker()中,j := <- job,只有当job中有返回,才会打印worker started。但是后面的job id都是0,说明没有向jobs管道中发送新数据。

for {
            fmt.Println("Waiting for job...")
            select {
            case j := <-Jobs :
                fmt.Println("worker", id, "started  job", j)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j)
                Results <- true
            }
        }

研究向Jobs管道发送数据的代码,突发奇想,把close(Jobs)注释掉,看看如何。

for j := 0; j < channelLength; j++ {
        Jobs <- j
    }
//close(Jobs)

程序居然正常了。

Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started  job 0
worker 0 started  job 2
worker 2 started  job 1
worker 1 finished job 0
worker 0 finished job 2
Waiting for job...
Waiting for job...
worker 2 finished job 1
Waiting for job...
Complete main

原来问题出在close()上,马上查注释。close()是在sender中调用,当管道中最后一个数据被接收以后,就关闭管道。此时,不能再向管道中发送数据。否则会报错panic: send on closed channel

使用x, ok := <-c可以判断一个管道是否关闭,如果管道已经关闭,ok的值为false

管道关闭以后,并且管道中的数据被接收完以后,居然还能从管道中接收到数据0。于是就造成了后续协程接收到job 0的问题。

// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
//  x, ok := <-c
// will also set ok to false for a closed channel.
func close(c chan<- Type)

如果要使用close,应该怎么做

管道不用时,close()管道是个好习惯。此时,应该怎么解决这个问题呢?首先要在协程中检查接收到的数据,j:=<-jobs,判断j是否为0。如果Jobs中存放的是非指针数据,不能分辨0是真正的0值,还是close以后接收到的0。因此需要在Jobs管道中存放指针。管道打开时,接收的都是非nil指针。close以后才返回0,也就是nil指针。

修改程序。新生成一个机构体Job。

type Job struct {
    JobId int
}

Jobs保存指向Job的指针。

Jobs chan *Job
func main() {
    Jobs = make(chan *Job, channelLength)
    ...
    for j := 0; j < channelLength; j++ {
        Jobs <- &Job{JobId:j}
    }
    close(Jobs)
    ...
}

在worker协程中,从管道取出Job指针以后,判断指针是否为nil。如果为nil,说明管道已经关闭,协程退出。

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                if j == nil {
                    fmt.Println("Close the worker", id)
                    return
                }
                fmt.Println("worker", id, "started  job", j.JobId)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j.JobId)
                Results <- true
            }
        }
    }()
}

运行结果达到预期。

Waiting for job...
Waiting for job...
Waiting for job...
worker 0 started  job 0
worker 1 started  job 1
worker 2 started  job 2
worker 2 finished job 2
worker 0 finished job 0
Waiting for job...
Waiting for job...
Close the worker 2
Close the worker 0
worker 1 finished job 1
Waiting for job...
Close the worker 1
Complete main

附上最终的代码。

package main

import (
    "fmt"
    "time"
)

type Job struct {
    JobId int
}

func worker(id int) {
    go func() {
        for {
            fmt.Println("Waiting for job...")
            select {
            // Receive from channel
            case j := <-Jobs :
                if j == nil {
                    fmt.Println("Close the worker", id)
                    return
                }
                fmt.Println("worker", id, "started  job", j.JobId)
                time.Sleep(time.Second)
                fmt.Println("worker", id, "finished job", j.JobId)
                Results <- true
            }
        }
    }()
}

const channelLength = 3

var (
    Jobs chan *Job
    Results chan bool
)

func main() {
    Jobs = make(chan *Job, channelLength)
    Results = make(chan bool, channelLength)

    // Start worker goroutines
    for i:= 0; i < channelLength; i++ {
        worker(i)
    }

    // Send to channel
    time.Sleep(time.Second)
    for j := 0; j < channelLength; j++ {
        Jobs <- &Job{JobId:j}
    }
    close(Jobs)

    for len(Jobs) != 0 || len(Results) != channelLength  {
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Println("Complete main")
}
上一篇下一篇

猜你喜欢

热点阅读