程序员

tunny源码阅读

2018-11-14  本文已影响0人  lizhuoming

前言

最近在学习Go并发,在同学强烈推荐下,阅读了tunny源码。在此记录自己的理解和感想。

tunny 基于Go实现的协程池

要去理解一个东西,最快的方式莫过于先去熟悉使用它。那么,现在我们就开始使用它:假设我们现在的需求是输入一个字符串,将它与"Hello"拼接后打印并返回,要求用tunny实现。

    #在NewFunc函数中需要对传入的函数进行转换并执行
    printHello := func(str interface{}) interface{} {
        fmt.Println("Hello!", str)
        return "Hello! " + str.(string)
    }
    pool3 := tunny.NewFunc(3, func(payload interface{}) interface{} {
        f, ok := payload.(func())
        if !ok {
            return nil
        }
        f()
        return f
    })
    pool3.Process(printHello("lizhuoming"))
    #而NewCallback的Process函数封装了这个操作
    printHello := func(str interface{}) interface{} {
        fmt.Println("Hello!", str)
        return "Hello! " + str.(string)
    }
    pool2 := tunny.NewCallback(2)
    pool2.Process(printHello("lizhuoming"))
#New的灵活度最高,我们可以定制自己的Worker
type myWorker struct {
    processor func(interface{}) interface{}
}

func (w *myWorker) Process(payload interface{}) interface{} {
    return w.processor(payload)
}

func (w *myWorker) BlockUntilReady() {}
func (w *myWorker) Interrupt()       {}
func (w *myWorker) Terminate()       {}

func main() {
    printHello := func(str interface{}) interface{} {
        fmt.Println("Hello!", str)
        return "Hello! " + str.(string)
    }

    pool1 := tunny.New(3, func() tunny.Worker {
        return &myWorker{
            processor: printHello,
        }
    })
    pool1.Process("lizhuoming")
}

源码分析

在熟悉了tunny的使用后,我们通过代码来看看它是如何工作的吧~

协程池的主要工作流程

在我们创建并指定协程池容量后,协程池会启动指定容量个协程。它们竞争向一个channel中写入 workRequest(它充当一个桥梁,连接 Process 函数与真正执行任务的协程)。当你调用 Process 函数时,它会通过这个桥梁将任务传递给协程,并在任务结束后,接收到协程返回的结果。
下面,我们来了解它的具体实现吧

桥梁以及Process函数与协程之间的通信实现

type Worker interface {
    // 执行任务
    Process(interface{}) interface{}
    // 在执行任务前执行,相当于init
    BlockUntilReady()
    // 在任务执行时被终止时,会执行该函数
    Interrupt()
    // 当协程被关闭时,执行该函数
    Terminate()
}

//协程池
type Pool struct {
        //正在执行的任务数量
        queuedJobs int64
        ctor    func() Worker
        workers []*workerWrapper
        //所有运行的协程会竞争向该channel写入workRequest
        reqChan chan workRequest
        workerMut sync.Mutex
}

//桥梁载体
type workRequest struct {
        //接收任务的channel
        jobChan chan<- interface{}
        //返回结果的channel
        retChan <-chan interface{}
        //中断协程的执行
        interruptFunc func()
}

//负责管理worker(stop函数)和goroutine(interrupt函数)的整个生命周期
type workerWrapper struct {
        worker        Worker
        interruptChan chan struct{}
        // workerWrapper 和 Pool 的reqChan是同一个(channel是引用传递)
        reqChan chan<- workRequest
        closeChan chan struct{}
        closedChan chan struct{}
}
func (p *Pool) ProcessTimed(
    payload interface{},
    timeout time.Duration,
) (interface{}, error) {
    atomic.AddInt64(&p.queuedJobs, 1)
    defer atomic.AddInt64(&p.queuedJobs, -1)

    tout := time.NewTimer(timeout)

    var request workRequest
    var open bool

    select {
    //读取桥梁载体
    case request, open = <-p.reqChan:
        if !open {
            return nil, ErrPoolNotRunning
        }
    //超时处理
    case <-tout.C:
        return nil, ErrJobTimedOut
    }

    select {
    //通过桥梁载体将任务传给协程
    case request.jobChan <- payload:
    case <-tout.C:
        //调用 workerWrapper 的 interrupt 方法,结束函数执行
        request.interruptFunc()
        return nil, ErrJobTimedOut
    }

    select {
    //接收返回数据
    case payload, open = <-request.retChan:
        if !open {
            return nil, ErrWorkerClosed
        }
    case <-tout.C:
        //调用 workerWrapper 的 interrupt 方法,结束函数执行
        request.interruptFunc()
        return nil, ErrJobTimedOut
    }

    tout.Stop()
    return payload, nil
}

func (w *workerWrapper) run() {
    jobChan, retChan := make(chan interface{}), make(chan interface{})
    defer func() {
        w.worker.Terminate()
        close(retChan)
        close(w.closedChan)
    }()

    for {
        w.worker.BlockUntilReady()
        select {
        // 给channel中写入桥梁载体,为协程私有
        case w.reqChan <- workRequest{
            jobChan:       jobChan,
            retChan:       retChan,
            interruptFunc: w.interrupt,
        }:
            select {
            //尝试读取任务
            case payload := <-jobChan:
                result := w.worker.Process(payload)
                select {
                case retChan <- result:
                case <-w.interruptChan:
                    w.interruptChan = make(chan struct{})
                }
            //执行被中断,新建中断 channel
            case _, _ = <-w.interruptChan:
                w.interruptChan = make(chan struct{})
            }
        // 协程被关闭
        case <-w.closeChan:
            return
        }
    }
}

协程池如何保证协程数恒定

func (p *Pool) SetSize(n int) {
    p.workerMut.Lock()
    defer p.workerMut.Unlock()

    lWorkers := len(p.workers)
    if lWorkers == n {
        return
    }
    // 给池中添加协程
    for i := lWorkers; i < n; i++ {
        p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
    }
    // 异步关闭超出的协程
    for i := n; i < lWorkers; i++ {
        p.workers[i].stop()
    }
    // 同步等待所有超出协程都关闭完成
    for i := n; i < lWorkers; i++ {
        p.workers[i].join()
    }
    p.workers = p.workers[:n]
}

一些感想

代码就说到这儿了,下面来谈谈我的感想:
(1)不得不说人家的代码健壮性真好,以后自己在写代码时也要借鉴人家的经验
(2)通过对 workerWrapper 和 workRequest 的设计和逻辑的拆分,使代码解耦,并且这样的代码逻辑看起来是非常清晰的

上一篇下一篇

猜你喜欢

热点阅读