@IT·互联网程序员互联网科技

并发下资源的访问控制

2016-11-04  本文已影响569人  大蟒传奇

背景

在开发微信公众号的时候,会和access_token打交道,参照微信的文档

access_token是公众号的全局唯一票据,公众号调用各接口时都需使用access_token。开发者需要进行妥善保存。access_token的存储至少要保留512个字符空间。access_token的有效期目前为2个小时,需定时刷新,重复获取将导致上次获取的access_token失效。

按照文档上推荐的做法,需要一个用来获取和刷新access_token的中控服务器,其他业务逻辑服务器所使用的access_token均来自该中控服务器。

出于安全的考虑,微信对于获取access_token的调用有一定的次数限制,超过这个限制,就无法再刷新token。

问题

在实际开发中,中控服务器的做法如下图

请求access token流程

假设这样一种情景,在redis中缓存的token刚好过期时,第三方向中控服务器同时发送了大量的请求。为了让问题简化,这里假设收到了A和B两条请求。

中控服务收到请求A时,查询缓存,没有命中,于是调用微信api,重新获取token,然后写入缓存,实测这个过程大概需要0.1到0.2秒(这个值和所处的网络环境也有关系)。在请求A将token写入缓存前,请求B来了,查询redis,也没有命中,也会调用微信的api来重新获取token。

实际上在业务中,只需要调用一次微信api来获取token即可。可是在上面的例子中,却调用了两次。如果并发量足够大,让中控服务反复去调用微信的api,很有可能就会超出微信的限制,一旦这种情况发生,对于业务的运营将是灾难性的。

测试

为了说明上面的问题,笔者编写了一个小的例子来模拟这种情况。
服务端采用Django,客户端使用go语言来高并发调用服务端的接口。

服务端

服务端代码,这里只是列出关键代码,其他一些配置项之类的代码在这里略过不计。
创建项目

django-admin startproject accessTokenTest
python manage.py startapp index

编写返回token的api

# view函数
def index(request):
    cache.incr(settings.CounterKey)
    token = cache.get(settings.TokenKey)
    if token is None:
        token = create_access_token()
        cache.set(settings.TokenKey, token, 5 * 60)
    return HttpResponse(json.dumps({'token': token}), content_type='text/json')
        
# 模拟调用微信api生成access token
def create_access_token():
    time.sleep(0.3)
    cache.incr(settings.CreateKey)
    return str(uuid4())

测试的例子采用redis作为缓存,通过sleep来模拟一个网络请求,并且将请求的次数和生成token的次数存在redis里,便于我们得到测试结果。

使用gunicorn,启用4个进程来模拟服务端

gunicorn accessTokenTest.wsgi --workers 4
[2016-11-04 13:04:29 +0800] [12720] [INFO] Starting gunicorn 19.6.0
[2016-11-04 13:04:29 +0800] [12720] [INFO] Listening at: http://127.0.0.1:8000 (12720)
[2016-11-04 13:04:29 +0800] [12720] [INFO] Using worker: sync
[2016-11-04 13:04:29 +0800] [12723] [INFO] Booting worker with pid: 12723
[2016-11-04 13:04:29 +0800] [12724] [INFO] Booting worker with pid: 12724
[2016-11-04 13:04:29 +0800] [12725] [INFO] Booting worker with pid: 12725
[2016-11-04 13:04:29 +0800] [12726] [INFO] Booting worker with pid: 12726

客户端通过GET请求http://127.0.0.1:8000 来请求token

客户端

客户端代码如下

// filename accessToken.go

package main

import(
    "net/http"
    "encoding/json"
)

type AccessToken struct {
    Token string
}

func main(){
    channel := make(chan error)
    for ;;{
        token := new(AccessToken)
        go func(){
            channel <- getJson("http://127.0.0.1:8000", token)
        }()

    }
}

func getJson(url string, target interface{}) error {
    r, err := http.Get(url)
    if err != nil {
        return err
    }
    defer r.Body.Close()
    return json.NewDecoder(r.Body).Decode(target)
}

编译后生成可执行文件accessToken

在测试开始前,启动redis服务,设置对应的key

127.0.0.1:6379[1]> persist ":1:counter"
(integer) 0
127.0.0.1:6379[1]> persist ":1:create"
(integer) 0

启动客户端,进行测试,运行一段时间后,手动杀死

./tokenTest
^C

查看测试数据,可以看到在测试的时间内,服务端一共收到了客户端1522次请求,4次生成了新的token。

127.0.0.1:6379[1]> get ":1:create"
"4"
127.0.0.1:6379[1]> get ":1:counter"
"1522"

分析问题

这个场景要求获取access token这个操作必须是原子的。
可以进一步得抽象为在某段时间内对"access_token"这个资源只能有一个进程进行访问。

解决方法

说到原子操作,笔者第一反应就是信号量。下面我们将使用信号量来解决这个问题。
采用posix_ipc模块,只修改服务端的代码

为了确保每次运行项目,信号量的状态保持一致,修改index/apps.py这个文件,在启动时初始化信号量。

服务端v2

from posix_ipc import Semaphore, ExistentialError, O_CREAT

class IndexConfig(AppConfig):
    name = 'index'

    def ready(self):
        try:
            sem = Semaphore(settings.TokenSemaphoreName)
            sem.unlink()
        except ExistentialError:
            pass
        finally:
            Semaphore(settings.TokenSemaphoreName, flags=O_CREAT, initial_value=1)

修改视图函数,如下

def index(request):
    cache.incr(settings.CounterKey)
    sem = Semaphore(settings.TokenSemaphoreName)
    sem.acquire()
    token = cache.get(settings.TokenKey)
    if token is None:
        token = create_access_token()
        cache.set(settings.TokenKey, token, 5 * 60)
    sem.release()

    return HttpResponse(json.dumps({'token': token}), content_type='text/json')

测试2

在测试前,清空之前的数据,并删除缓存的token

127.0.0.1:6379[1]> del ":1:token"
(integer) 1
127.0.0.1:6379[1]> set ":1:counter" 0
OK
127.0.0.1:6379[1]> set ":1:create" 0
OK

和之前一样启动服务端和客户端,在运行一段时间后,退出客户端。
查看结果,可以看到客户端请求了980次,服务端只生成了一次token,这个结果正是我们想要的。

127.0.0.1:6379[1]> get ":1:counter"
"980"
127.0.0.1:6379[1]> del ":1:create"
(integer) 1

多主机场景

看起来问题好像得到解决了?并没有!

在实际的生产环境中,为了保持服务的高可用,经常会使用负载均衡这样的技术。

负载均衡

在这样的场景下使用上面的方案,每台服务器都会生成自己的信号量,在高并发的情况下依然会出现多次请求access token的情况。

测试

这里使用nginx来实现负载均衡,使用docker来模拟多主机。

nginx的相关配置如下

...
upstream back {
    server 127.0.0.1:8080;
    server 127.0.0.1:8087;
}
...
...
location / {
     proxy_pass http://back;
}
...

启动container

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS                   PORTS                    NAMES
c845d3123a08        python:2.7               "python2"                40 minutes ago      Up 10 minutes            0.0.0.0:8080->8000/tcp   python

在container中启动线程

gunicorn accessTokenTest.wsgi --workers 4 -b 0.0.0.0:8000

同时在宿主机上也启动线程

gunicorn accessTokenTest.wsgi --workers 4 -b 0.0.0.0:8087

和之前一样,测试前清除缓存中的token,并将counter和create设置为0

在宿主机上启动客户端,在token缓存失效前断掉
查看结果,可以看到客户端一共请求了2062次,生成了2次token,和预期的一致。

127.0.0.1:6379[1]> get ":1:create"
"2"
127.0.0.1:6379[1]> get ":1:counter"
"2062"

在多主机的情况下,如果要确保请求access token的原子性,需要一种“分布式锁”。

新的解决方案

采用redis来辅助实现分布式锁。尽管有着一定的争论,但是能满足现在的需求。

实现的算法来自redis作者的文章,这里直接采用redlock-py

服务端v3

def index(request):
    cache.incr(settings.CounterKey)
    dlm = Redlock([{"host": "your-host-ip", "port": 6379, "db": 0}, ])
    my_lock = dlm.lock("my_resource_name", 1000)
    token = cache.get(settings.TokenKey)
    
    if token is None:
        token = create_access_token()
        cache.set(settings.TokenKey, token, 5 * 60)
    dlm.unlock(my_lock)

    return HttpResponse(json.dumps({'token': token}), content_type='text/json')

测试3

测试环境和之前一样。更新代码后,重启启动服务端,处理之前的redis缓存

启动客户端一段时间后断掉。
查看测试结果, 客户端一共请求了88次,生成了1次token,和预期也是一致的

127.0.0.1:6379[1]> get ":1:counter"
"88"
127.0.0.1:6379[1]> get ":1:create"
"1"
上一篇下一篇

猜你喜欢

热点阅读