Nginx

Nginx - API网关之请求限流功能实现与测试

2024-02-01  本文已影响0人  红薯爱帅

1. 概述

对于分布式的微服务系统,如果集群最大并发rps为10。且超过10之后,可能会导致系统异常。
此时,需要有一个API网关(API Gateway),完成rps的限制。

当然,APIGateway还有其他功能,通常如下:

这里主要为了实现请求限流功能,以nginx为例,介绍一下如何配置与测试。
测试脚本已安排到本文末尾,可以用于复现和验证。

漏桶原理

2. nginx limit req

Nginx采用漏桶原理,实现了请求限流。
为了搞清楚具体细节,下面主要测试3种情况。

配置A:单独burst

对于不采用burst的方式,如果超过rate的request,会直接丢失,给用户返回439,并不友好。
采用单独burst的方法,虽然使得请求的流量变得“均匀平滑”,但是确实很大程度上增加了响应时间。排在队列越后面的请求的等待时间越长,这就导致了它们的响应时间平白无故地增加了许多,过长的响应时间甚至可能会导致客户端认为请求异常或者直接导致请求超时。

limit_req_zone $binary_remote_addr zone=rps:10m rate=1r/s;

location /login/ {
    limit_req zone=rps burst=20;
    proxy_pass http://my_upstream;
}

Test Case 1: burst值和timeout

结论:timeout的request,也会一直占用burst缓冲。

例如,如果rate=1r/m,发起第一个request,可以得到结果。
再执行第二次request,30s之后将timeout;再执行第三次request,30s之后也将timeout。
但是,第二次和第三次也占用了burst缓冲。
只有在第四分钟时,执行第四次request,才能得到正常的结果。
所以,burst的设置,需要谨慎,不可过大。在按照rate消耗burst缓冲的请求,不应超过timeout时长。
常用配置可以是rate=10r/s,burst=20。因为,超过2秒,页面就会感觉到卡顿。至于更多的请求,直接返回439。

$ http POST :8888/api/v1/images/search
http: error: Request timed out (30s).

$ http POST :8888/api/v1/images/search
http: error: Request timed out (30s).

$ http POST :8888/api/v1/images/search
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 2
Content-Type: text/html; charset=utf-8
Date: Thu, 01 Feb 2024 07:03:10 GMT
Server: nginx/1.21.1

ok

$ http POST :8888/api/v1/images/search
http: error: Request timed out (30s).

Test Case 2: burst值与资源占用

burst为50时,可以支持最少50个用户(TcpConnection)的并发请求。
单请求的最大耗时为5s=50/10,即burst/rate
nginx消耗内存50-60MB,CPU占用率0.5%,并不高。

burst为50时,如果是30个用户(TcpConnection)的并发请求。
单请求的最大耗时为3s=30/10,即burst实际值/rate
nginx消耗仍然是100MB以内,CPU占用率0.5%,并不高。

配置B:burst + nodelay

突发的请求会一次性发送给upstream。
经过测试,确实如此。也就是说,集群有可能在某一个瞬间,rps超过rate值

$ python test.py 
Success:     0, Fail: 0
Success:    25, Fail: 210  ---> 多次测试,25个Thread的情况下,会直接25
Success:    36, Fail: 492
Success:    46, Fail: 776
Success:    56, Fail: 1056
Success:    66, Fail: 1346

配置C:burst + delay

简单来说,所谓的分段限速就是允许客户端在刚开始的时候有一定的突发请求,后面再进入到平稳的限速中。
至于delay,应该是配置A和配置B之间的一个中间方法。

limit_req_zone $binary_remote_addr zone=ip:10m rate=5r/s;

server {
    listen 80;
    location / {
        limit_req zone=ip burst=12 delay=8;
        proxy_pass http://website;
    }
}

3. 总结

limit_req_zone 'global' zone=rps:10m rate=10r/s;

4. 参考

https://tinychen.com/20210616-nginx-10-triple-rate-limiting-limit-req/
https://nginx.org/en/docs/http/ngx_http_limit_req_module.html
https://juejin.cn/post/7297154281870082100

5. 其他

a. 阿里云lb过来的请求,remote_addr是否是是它自己,还是用户的真实IP?

通过这个文档来看,应该没有修改remote-addr,只添加了一个X-Forwarded-For字段,记录用户真实IP。
所以,lb->nginx,而nginx采用remote-addr的方式实现limit_req,问题不大。
https://help.aliyun.com/zh/slb/classic-load-balancer/use-cases/preserve-client-ip-addresses-when-layer-7-listeners-are-used

b. swarm的lb是什么原理?两个manager的话,会怎么样?

Swarm内置了DNS服务,可以自动为每个Service生成DNS名称,使服务间可以通过DNS发现和访问。
如果有多个Manager节点,Swarm会选举一个Leader节点,该节点主要负责语调度任务分配决策。其他Manager节点与Leader节点互为备份。
当Leader节点不可用时,剩余Manager节点会重新选举产生新的Leader,从而实现高可用。并且各个Manager之间会互相同步所维护的集群状态信息。
总之,Swarm利用overlay网络和内置DNS、LB实现服务发现和负载均衡,多个Manager节点互为备份来保证服务高可用性。
为了避免大量无效请求直接冲击swarm集群,所以部署一个裸docker容器作为apigateway会更好。

c. docker container

docker run -d --name apigateway \
  --restart always \
  -v `pwd`/nginx.conf:/etc/nginx/nginx.conf \
  -p 8888:80 \
  --memory 500MB \
  --log-opt max-size=100m --log-opt max-file=5 \
  nginx:1.21.1

6. 附件

from flask import Flask
import time


app = Flask(__name__)


@app.route("/ping")
def ping():
    return "ping"


@app.route("/api/v1/images/search", methods=['POST'], strict_slashes=False)
def search():
    time.sleep(0.5)
    return "ok"


if __name__ == "__main__":
    app.run(host="0.0.0.0", port="9999")


"""
$ http :9999/ping
$ http head :9999/ping
$ http post :9999/api/v1/images/search
"""
# For more information on configuration, see:
#   * Official English Documentation: http://nginx.org/en/docs/
#   * Official Russian Documentation: http://nginx.org/ru/docs/

user nginx;
worker_processes auto; #启动进程
error_log /dev/stdout; #全局错误日志
pid /run/nginx.pid; #PID文件

# Load dynamic modules. See /usr/share/nginx/README.dynamic.
include /usr/share/nginx/modules/*.conf;

events {
    worker_connections 1024; #单个后台worker process进程的最大并发链接数 
}

http {
    # 设定mime类型
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    
    # 设定日志格式
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent $request_time[$upstream_response_time] "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
    access_log  /dev/stdout  main;

    # limit_req_zone $binary_remote_addr zone=rps:10m rate=10r/s;
    limit_req_zone 'global' zone=rps:10m rate=10r/s;
    limit_req_status 439;

    server {
        listen 80 default_server;

        location = / {
            return 204;
        }
        
        location /api/v1 {
            # limit_req zone=rps;
            limit_req zone=rps burst=30;
            proxy_pass http://xxx:9999;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }

        location / {
            proxy_pass http://xxx:9999;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }
    }
}
import threading
import requests
from queue import Queue, Empty
import time


HOST = "http://127.0.0.1:8888"
USER_COUNT = 20

resps = Queue()


def fn(id: int):
    while True:
        resp = requests.post(f"{HOST}/api/v1/images/search")
        resps.put(dict(name=f"User-{id:2d}", status_code=resp.status_code))
        time.sleep(0.01)


def print_report(report):
    print(f"Success: {report['success']:5d}, Fail: {report['fail']}")


def statistics():
    users = dict()
    report = dict(success=0, fail=0)
    while True:
        try:
            msg = resps.get_nowait()
            is_ok = msg["status_code"] < 400
            key = "success" if is_ok else "fail"
            report[key] += 1
        except Empty:
            print_report(report)
            time.sleep(1)


def main():
    threading.Thread(target=statistics).start()
    
    for id in range(USER_COUNT):
        threading.Thread(target=fn, args=(id,)).start()


if __name__ == "__main__":
    main()
from locust import HttpUser, task
from datetime import datetime


HOST = "http://127.0.0.1:8888"


class LbsUser(HttpUser):
    host = HOST

    @task
    def head(self):
        self.client.head("/")

    @task
    def ping(self):
        self.client.get("/ping")


class SearchUser(HttpUser):
    host = HOST

    @task
    def search(self):
        # resp = self.client.post("/login", {"username":"testuser", "password":"secret"})
        print(datetime.now(), end=",")
        resp = self.client.post("/api/v1/images/search")
        print("Response status code:", resp.status_code)
        

"""
locust --web-host 127.0.0.1 --class-picker --modern-ui
"""

上一篇 下一篇

猜你喜欢

热点阅读