应用微服务golang

Go:每分钟处理百万请求

2021-04-11  本文已影响0人  Go语言由浅入深

【译文】原文地址

问题

从事匿名遥测和分析系统,我们的目标是能够处理来自大量客户端的POST请求。我们的web服务将接收JSON文档内容包括很多的负载需要发送到亚马逊S3存储,为了后续使用map-reduce来处理这些数据。

传统方式我们将创建worker-tier架构,使用包含如下中间件:

起初我们创建一些struct来定义web服务POST请求接收的负载和上传数据到S3桶的方法。

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // 待实现
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked 'private'
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

简单的Go协程方法

开始我们使用一个很简单的POST handler来实现,仅将任务放进一个简单goroutine中来并行处理。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 将body读取到字符串并使用json解码
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    //迭代每个payload并逐个上传到S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

对于流量不是很大的情况,可以应对大多数人请求,但是在大规模情况下很快上面的方法就被证明不是很好了。我们预期有很多的请求,但和我们部署第一个版本到生产环境中所看到的不一样。我们完全低估了流量。以上方法有很多不足地方。无法控制goroutine的数量。当达到每分钟1百万POST请求的时候,代码直接瘫痪了。

再次优化

需要另找方法。一开始我们就讨论如何保持处理请求生命周期非常短,并在后台处理。当然这个在Ruby中是必须做的,否则将阻塞所有可用的worker。然后我们就使用常规解决方案来做,比如Resque、Sidekiq、SQS等。很多处理这种问题的方法。

因此第二个版本通过创建带缓冲的channel,这样就可以缓存一些jobs,并逐步上传到S3,而且可以控制缓存队列的长度,有足够内存也能够存放这些job。我们认为将job存放到channel队列中是可以的。

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}

然后消费队列处理jobs,使用类似如下方式:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

老实说,我对我们所想的一点底都没有。这个方法并没有让我们轻松,我们用缓存来应对并发只是延缓了问题的爆发。同步处理机制,每次上传一个负载到S3,因为接受请求的速度太快,远比一个处理器上传到S3点速度快,导致缓存很快就挤满了,导致后面来的请求直接阻塞。我们只是在回避这个问题,直到倒计时我们的系统最终死亡。在我们部署了这个有缺陷的版本之后,我们的延迟率在几分钟内以恒定的速率不断增加。


更好的方法

我们决定在使用go channel时利用一个通用的模式,创建一个两层的channel系统,一个用来存放jobs另一个用于控制并发处理job队列的workers的数量。为了保持一定层度的并发上传数据到S3,一方面不会使系统拖垮,另一方面不会出现连接S3错误。因此我们选择创建一个job/worker模式。这在java、C#等中经常使用。考虑以golang方式实现可以通过使用channels来代替worker线程池。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

//Start方法通过循环监听任务请求和停止信号。
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
                        //注册当前worker到worker队列
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 接收到一个工作请求
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // 接收到停止工作信号
                return
            }
        }
    }()
}

//Stop方法通知worker停止监听工作请求
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

修改web请求处理函数,创建一个Job结构体实例并将Job实例发送到JobQenue channel供worker处理。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    //将body读取到字符串并使用json解码
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // 迭代每个payload并逐个上传到S3
    for _, payload := range content.Payloads {

       //创建Job实例
        work := Job{Payload: payload}

       //将worker发送到队列
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在web服务器初始化的时候创建一个Dispather和调用Run()来创建workers池,并开始监听JobQueue中jobs。

dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

以下是dispatcher实现:

type Dispatcher struct {
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.WorkerPool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // a job request has been received
            go func(job Job) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool

                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}

注意我们设置了workers的实例最大数量,并添加到worker池中。因为我们项目在docker环境中使用了亚马逊Elasticbeanstalk,所以在生产环境下我们总是遵循12-factor原则来配置我们系统,通过环境变量的方式读取配置。这种方式我们可以控制workers的数量和JobQueue的长度,因此可以快速调整这些值不需要重新部署集群。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

中间结果

在部署上面的优化方案之后,很快我们就发现延时下降到可接受范围之内,并且可以处理波动很大的请求量。



在几分钟后当弹性负载均衡起作用后,看到ElasticBeanstalk应用服务处理近1百度请求每分钟。经常在早上几小时流量还能飚升到超过一百万每分钟。

新代码部署后,服务器的数量很快从100多个降20个。


总结

在这里使用了简单的方法。本来我们可以设计复杂的系统包含很多的队列、后台workers、复杂的部署,但是我们决定使用Elasticbeanstalk自动扩展能力和go提供的高效简单的并发方法。

上一篇下一篇

猜你喜欢

热点阅读