go语言实现的工作池

2021-06-30  本文已影响0人  atdoking

今天看书发现了一个go语言实现的工作池,而且非常完美。

下面是一个非常完善的工作池示例。这个示例的模式完全可以再实战中使用,或者说Go语言中工作池的最佳方案只有这一个,实战中只需要进行灵活改变,使其贴近业务需求

package main

import (
    "fmt"
    "reflect"
    "time"
)

type Task struct {
    Num int
}

type Job struct {
    Task Task
}
var (
    MaxWorker = 5
    JobQueue chan Job //工作通道,模拟需要处理的所有工作
)

type Worker struct {
    id         int           //id
    WorkerPool chan chan Job //工作者池(通道的通道),每个元素是一个job通道,公共的job
    JobChannel chan Job      //工作通道,每个元素是一个job,worker私有的job
    exit       chan bool     //结束信号
}

func NewWorker(workerPool chan chan Job, id int) Worker {
    fmt.Printf("new a worker(%d)\n",id)
    return Worker{
        id:         id,
        WorkerPool: workerPool, //workerPool和scheduler的是同一个
        JobChannel: make(chan Job),
        exit:       make(chan bool),
    }
}

//监听任务和结束信号
func (w Worker) Start() {
    go func() {
        for {
            // 将当前的任务队列注册到工作池.
            w.WorkerPool <- w.JobChannel
            fmt.Println("register private JobChannel to public WorkerPool", w)
            select {
            case job := <-w.JobChannel://收到任务
                fmt.Println("get a job from private w.JobChannel")
                fmt.Println(job)
                time.Sleep(5* time.Second)
            case <-w.exit://收到结束信号
                fmt.Println("worker exit",w)
                return
            }
        }
    }()
}

func (w Worker) Stop() {
    go func() {
        w.exit <- true
    }()
}

//排程中心
type Scheduler struct {
    WorkerPool chan chan Job //工作池
    MaxWorkers int //最大工作者数
    Workers []*Worker //worker队列
}

//创建排程中心
func NewScheduler(maxWorkers int) *Scheduler {
    pool := make(chan chan Job, maxWorkers) //工作池
    return &Scheduler{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作池的初始化
func (s *Scheduler) Create() {
    workers := make([]*Worker,s.MaxWorkers)
    for i := 0; i < s.MaxWorkers; i++ {
        worker := NewWorker(s.WorkerPool, i)
        worker.Start()
        workers[i] = &worker
    }
    s.Workers = workers
    go s.schedule()
}

//工作池的关闭
func (s *Scheduler) Shutdown()  {
    workers := s.Workers
    for _,w := range workers{
        w.Stop()
    }
    time.Sleep(time.Second)
    close(s.WorkerPool)
}

//排程
func (s *Scheduler) schedule() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("get a job from JobQueue")
            go func(job Job) {
                //从WorkerPool获取jobChannel,忙时阻塞
                jobChannel := <-s.WorkerPool
                fmt.Println("get a private jobChannel from public s.WorkerPool", reflect.TypeOf(jobChannel))
                jobChannel <- job
                fmt.Println("worker's private jobChannel add one job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 5)
    scheduler := NewScheduler(MaxWorker)
    scheduler.Create()
    time.Sleep(1 * time.Second)
    go createJobQueue()
    time.Sleep(100 * time.Second)
    scheduler.Shutdown()
    time.Sleep(10*time.Second)
}
//创建模拟任务
func createJobQueue() {
    for i := 0; i < 30; i++ {
        task := Task{Num: 1}
        job := Job{Task: task}
        JobQueue <- job
        fmt.Println("JobQueue add one job")
        time.Sleep(1 * time.Second)
    }

}
上一篇 下一篇

猜你喜欢

热点阅读