【Go Web开发】基于IP限流

2022-03-01  本文已影响0人  Go语言由浅入深

上一篇文章我们介绍了全局限流,当您希望严格对API的请求总速率进行限制,并且不关心请求来自何处时,使用全局限流器可能很有用。但通常更常见的是为每个客户端单独设置一个限流器,这样可以防止单个客户端发出太多请求,影响其他客户端。

一种简单的实现方法是创建一个map来为每个客户端创建一个限流器映射,使用每个客户端的IP地址作为map的键。当一个新客户端向API发出请求时,我们将初始化一个新的限流器并将其添加到map中。对于任何后续请求,我们将从map中检索客户端的限流器,并通过调用其Allow()方法检查请求是否允许,就像之前所做的那样。

因为可能会有多个goroutine并发地访问map,所以我们需要使用互斥锁来防止竞争条件来保护对map的访问。如果您正在跟随本文操作,那么一起开始编码并更新上一篇文章中的rateLimit()中间件来实现这一点。

File: cmd/api/middleware.go


package main

...

func(app *application)rateLimit(next http.Handler) http.Handler{
    //申明一个mutex和map存放客户端IP和限流器实例
    var (
        mu      sync.Mutex
        clients = make(map[string]*rate.Limiter)
    )
    return http.HandlerFunc(func(w http.ResponseWrite, r *http.Request){
    //从请求中提取出客户端IP地址
    ip, _, err := net.SplitHostPort(r.RremoteAddr)
    if err != nil {
        app.serverErrorResponse(w, err)
        return
        }
     //上锁放在并发写入map
     mu.Lock()
     //检查map中IP是否存在,不存在就创建限流器
     if _, found := clients[ip]; !found{
        clients[ip] = rate.NewLimiter(2, 4)
        }
     //调用Allow()方法,查看请求是否允许
     if !clients[ip].Allow(){
        mu.Unlock()
        app.rateLimitExceededResponse(w, r)
        return
        } 
     //即使请求允许也要释放锁
     mu.Unlock()
     next.ServerHTTP(w, r)
    })
}

删除Map中旧限流器

上面的代码可正常工作,但是有一个小问题,clients这个map会无限的增加键值对,添加的每一个新的IP地址和限流器都会占用越来越多的资源。

为了防止这种情况,我们更新下代码,记录每个客户端的最后一次访问API时间。然后运行一个后台goroutine,定期从映射中删除最近没有访问服务的客户端。为了做到这一点,我们需要创建一个自定义的客户端结构体,它包含每个客户端的限流器和最后一次访问服务时间,并在初始化中间件时启动后台清理程序。

File: cmd/api/middleware.go


package main

...

func (app *application) rateLimit(next http.Handler) http.Handler {
        //定义client结构体,包含限流器和最后一次访问API服务的时间
    type client struct {
        limiter  *rate.Limiter
        lastSeen time.Time
    }
    var (
        mu      sync.Mutex
                //更新map值为client结构体指针
        clients = make(map[string]*client)
    )

        //启动goroutine清除长时间没有访问服务的值
    go func() {
        for {
            time.Sleep(time.Minute)
                        //更新map需要上锁
            mu.Lock()
                        //循环遍历所有客户端,如果3分钟没有再访问服务,就清除
            for ip, client := range clients {
                if time.Since(client.lastSeen) > 3*time.Minute {
                    delete(clients, ip)
                }
            }
            mu.Unlock()
        }
    }()

    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        if app.config.limiter.enable {
            ip, _, err := net.SplitHostPort(r.RemoteAddr)
            if err != nil {
                app.serverErrorResponse(w, r, err)
                return
            }

            mu.Lock()

            if _, found := clients[ip]; !found {
                clients[ip] = &client{
                                        //如果map中不存在的IP键,就创建新的client结构体
                    limiter: rate.NewLimiter(2, 4)}
            }

            if !clients[ip].limiter.Allow() {
                mu.Unlock()
                app.rateLimitExceededResponse(w, r)
                return
            }
            mu.Unlock()
        }
        next.ServeHTTP(w, r)
    })
}

此时,如果您重新启动服务并再次尝试快速连续地发出一批请求,应该会发现,从单个客户端的角度来看,限流器能正确工作,还是像以前一样返回。

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}

附加内容

分布式应用

使用此模式进行限流只在API应用程序运行在单机上时有效。如果您的基础设施是分布式的,并且您的应用程序运行在负载均衡器后面的多个服务器上,那么您将需要使用另一种方法。

如果你使用HAProxy或Nginx作为负载均衡器或反向代理,这两者都有内置的限流功能,使用其自带限流器是明智的。或者,你可以使用像Redis这样的缓存来维护客户端的请求计数,运行在一个所有应用服务器都可以访问的服务器上。

限流器配置

目前,限流器的每秒请求数和突发值被硬编码到rateLimit()中间件中。这当然是可以的,但是如果它们在运行时可配置的话,将会更加灵活。同样,如果有一种简单的方法可以完全关闭限流器会非常有用(当您想要运行基准测试或执行负载测试时,当所有请求可能来自少量IP地址时,这将非常有用)。

为了使这些参数是可配置的,我们回到cmd/api/main.go文件,更新配置结构和命令行参数如下所示:

File:cmd/api/main.go


package main

...

// 定义一个配置结构体来保存应用程序的所有配置设置.
//目前,配置是服务器监听的端口和应用程序的环境名称 (开发, 预发, 生成等等)
//将从命令行参数中读取这些配制信息
type config struct {
    port int
    env  string
    db   struct {
        dsn          string
        maxOpenConns int
        maxIdleConns int
        maxIdleTime  string
    }
    limiter struct {
        rps    float64
        burst  int
        enable bool
    }
}

// 修改logger字段,用*jsonlog.Logger代替
type application struct {
    config config
    logger *jsonlog.Logger
    models data.Models
}

func main() {
    // 声明一个配置结构体实例
    var cfg config

    // 从命令行参数中将port和env读取到配制结构体实例当中。
    //默认端口使用4000以及环境信息使用开发环境development
    flag.IntVar(&cfg.port, "port", 4000, "API server port")
    flag.StringVar(&cfg.env, "env", "development", "Environment (development|staging|production)")

    flag.StringVar(&cfg.db.dsn, "db-dsn", "postgres://greenlight:pa55word@localhost/greenlight?sslmode=disable", "PostgreSQL DSN")
    flag.IntVar(&cfg.db.maxOpenConns, "db-max-open-conns", 25, "PostgreSQL max open connections")
    flag.IntVar(&cfg.db.maxIdleConns, "db-max-idle-conns", 25, "PostgreSQL max idle connections")
    flag.StringVar(&cfg.db.maxIdleTime, "db-max-idle-time", "15m", "PostgreSQL max connection idle time")

    //限流参数
    flag.Float64Var(&cfg.limiter.rps, "limiter-rps", 2, "Rate limiter maximum request per secod")
    flag.IntVar(&cfg.limiter.burst, "limiter-burst", 4, "Rate limiter maximum burst")
    flag.BoolVar(&cfg.limiter.enable, "limiter-enable", true, "enable")
    flag.Parse()

然后更新rateLimiter()中间件使用这些配置参数:

package main

...

func (app *application) rateLimit(next http.Handler) http.Handler {
        ...

    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                //只有配置打开限流功能才起作用
        if app.config.limiter.enable {
            ip, _, err := net.SplitHostPort(r.RemoteAddr)
            if err != nil {
                app.serverErrorResponse(w, r, err)
                return
            }

            mu.Lock()

            if _, found := clients[ip]; !found {
                        //从命令行参数读取限流器配置
                clients[ip] = &client{
                    limiter: rate.NewLimiter(rate.Limit(app.config.limiter.rps), app.config.limiter.burst),
                }
            }

            if !clients[ip].limiter.Allow() {
                mu.Unlock()
                app.rateLimitExceededResponse(w, r)
                return
            }
            mu.Unlock()
        }
        next.ServeHTTP(w, r)
    })
}

一旦完成以上代码,让我们通过运行带有-limiter-burst参数的API服务并将burst值减少为2来尝试一下:

$  go run . -limiter-burst=2
{"level":"INFO","time":"2021-12-18T10:40:57Z","message":"database connection pool established"}
{"level":"INFO","time":"2021-12-18T10:40:57Z","message":"starting server","properties":{"addr":":4000","env":"development"}}

如果您再次快速连续地发出一批6个请求,您现在应该会发现只有前两个成功:

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}

类似地,你可以尝试使用limiter-enabled=false来禁用限流器,如下所示:

$  go run . -limiter-enable=false
{"level":"INFO","time":"2021-12-18T10:43:41Z","message":"database connection pool established"}
{"level":"INFO","time":"2021-12-18T10:43:41Z","message":"starting server","properties":{"addr":":4000","env":"development"}}

你会发现现在所有的请求都成功完成了,不管你发了多少请求。

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
上一篇下一篇

猜你喜欢

热点阅读