Python/Go开发

go实现协程池管理

2020-07-08  本文已影响0人  Best博客

能做啥?

当你需要推送1000万条消息,推送每条消息大概需要花10ms。单线程慢慢发你肯定等不了,为每个发送行为开一个协程,哈哈哈,1000万个协程,4kb * 10000000 ,按最小的栈空间算40个G内存算是没了,就算你内存顶得住1000万的并发估计你服务器的文件数,cpu占用早就超过最大值了。入正题,这个时候你需要管理你的协程数,比如说我开辟一个1000的协程池,让这1000个协程并发的去推送1000万数量的消息即可。

通过 Channel 实现 Goroutine Pool,缺点协程的频繁开辟和注销,但简单通用灵活

package gopool

import (
    "sync"
)

// Pool Goroutine Pool
type Pool struct {
    queue chan int
    wg    *sync.WaitGroup
}

// New 新建一个协程池
func New(size int) *Pool {
    if size <= 0 {
        size = 1
    }
    return &Pool{
        queue: make(chan int, size),
        wg:    &sync.WaitGroup{},
    }
}

// Add 新增一个执行
func (p *Pool) Add(delta int) {
    // delta为正数就添加
    for i := 0; i < delta; i++ {
        p.queue <- 1
    }
    // delta为负数就减少
    for i := 0; i > delta; i-- {
        <-p.queue
    }
    p.wg.Add(delta)
}

// Done 执行完成减一
func (p *Pool) Done() {
    <-p.queue
    p.wg.Done()
}

// Wait 等待Goroutine执行完毕
func (p *Pool) Wait() {
    p.wg.Wait()
}

以上是gopool的包实现,以下demo取用

package main

import (
    "io/ioutil"
    "log"
    "net/http"
    "yumc.pw/cloud/lib/gopool"
)

func main() {
    // 这里限制100个并发
    pool := gopool.New(100)// sync.WaitGroup{}

  //假设需要发送1000万个http请求,然后我并发100个协程取完成这件事
    for i := 0; i < 10000000; i++ {
        pool.Add(1)  //发现已存在100个人正在发了,那么就会卡住,直到有人完成了宣布自己退出协程了
        go func(i int) {
            resp, err := http.Get("http://ip.3322.org")
            if err != nil {
                fmt.Println(i, err)
            } else {
                defer resp.Body.Close()
                result, _ := ioutil.ReadAll(resp.Body)
                fmt.Println(i, string(result))
            }
            pool.Done()
        }(i)
    }
    pool.Wait()
}

以上思路是我在go社区看到的,确实简单有效的做到了协程数量的控制,但是有木有发现会存在频发的协程的开辟与剔除,如果对性能有着很高的要求建议优化成固定数目的协程取channel里面取数据进行消费的模式,也就是消费者模式,这样避免了协程的创建与注销。

上代码

package main

import (
    "fmt"
    "strconv"
    "sync"
)
//任务对象
type task struct {
    Production
    Consumer
}
//设置消费者数目,也就是work pool大小
func(t *task)setConsumerPoolSize(poolSize int){
    t.Production.Jobs = make(chan *Job,poolSize * 10)
    t.Consumer.WorkPoolNum = poolSize
}

//任务数据对象
type Job struct {
    Data string
}

func NewTask(handler func(jobs chan *Job)(b bool))(t *task){
    t = &task{
        Production:Production{Jobs: make(chan *Job,100)},
        Consumer:Consumer{WorkPoolNum:100,Handler:handler},
    }
    return
}


type Production struct {
    Jobs chan *Job
}

func (c Production)AddData(data *Job){
    c.Jobs <- data
}

type Consumer struct {
    WorkPoolNum int
    Handler func(chan *Job)(b bool)
    Wg sync.WaitGroup
}

//异步开启多个work去处理任务,但是所有work执行完毕才会退出程序
func (c Consumer)disposeData(data chan *Job){
    for i:=0;i<=c.WorkPoolNum;i++{
        c.Wg.Add(1)
        go func() {
            defer func() {
                c.Wg.Done()
            }()
            c.Handler(data)
        }()
    }
    c.Wg.Wait()
}




func main(){
    //1.先实现一个用于处理数据的闭包,在这里面实现自己业务
    consumerHandler := func(jobs chan *Job)(b bool) {
        for job := range jobs {
            fmt.Println(job)
        }
        return
    }

    //2.new一个任务处理对象出来
    t :=NewTask(consumerHandler)
    t.setConsumerPoolSize(500)//500个协程同时消费
    //3.根据自己的业务去生产数据通过AddData方法去添加数据到生产channel,这里是1000万条数据
    go func(){
        for i := 0; i < 10000000; i++ {
            job := new(Job)
            iStr := strconv.Itoa(i)
            job.Data = "这里面去定义你的任务数据格式"+ iStr
            t.AddData(job)
        }
    }()

    //4.消费者消费数据
    t.Consumer.disposeData(t.Production.Jobs)
}


上一篇下一篇

猜你喜欢

热点阅读