基于redis key的分布式资源池锁
2020-03-26 本文已影响0人
Mr韩_xianfeng
基于redis自带的原子操作setnx,getset等原子操作命令实现资源锁定功能。
setnx:将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不做任何动作。
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
getset:将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
当 key 存在但不是字符串类型时,返回一个错误。
这个实现参考了网友的代码,可以实现多个资源锁抢占,相当于资源池锁。假定有两个环境可被使用,每个环境只能被一个任务占用,就可以使用这个锁去抢占资源
import time
from redis import Redis
class ResourceLock(object):
def __init__(self, redis_client: Redis, keys=[], expires=60, timeout=10):
"""
Distributed locking using Redis SETNX and GETSET.
Usage::
with ResourceLock(*args,**kwargs):
print "Critical section"
or
lock = ResourceLock(*args,**kwargs)
lock.acquire()
print('do something here...')
lock.release()
:param expires We consider any existing lock older than
``expires`` seconds to be invalid in order to
detect crashed clients. This value must be higher
than it takes the critical section to execute.
:param timeout If another client has already obtained the lock,
sleep for a maximum of ``timeout`` seconds before
giving up. A value of 0 means we never wait.
"""
self.keys = keys
self.available_key = None
self.timeout = timeout
self.expires = expires
self.redis_client = redis_client
def acquire(self):
timeout = self.timeout
while timeout >= 0:
expires = time.time() + self.expires + 1
for key in self.keys:
if self.redis_client.setnx(key, expires):
self.available_key = key
# We gained the lock; enter critical section
print(f'{self.available_key}')
return
for key in self.keys:
current_value = self.redis_client.get(key)
# We found an expired lock and nobody raced us to replacing it
if current_value and float(current_value) < time.time() and \
self.redis_client.getset(key, expires) == current_value:
self.available_key = key
print('过期')
print(f'{self.available_key}')
return
timeout -= 1
time.sleep(1)
raise LockTimeout("Timeout while waiting for lock")
def release(self):
print(f'delete {self.available_key}')
self.redis_client.delete(self.available_key)
def __enter__(self):
self.acquire()
def __exit__(self, exc_type, exc_value, traceback):
self.release()
class LockTimeout(BaseException):
pass
class redis_lock(object):
def __init__(self, redis_client, keys=[], expires=60):
self.rc = redis_client
self.keys = keys
self.expires = expires
def __call__(self, func):
def _dec(*args, **kwargs):
with ResourceLock(redis_client=self.rc, keys=self.keys, expires=self.expires):
return func(*args, **kwargs)
return _dec
使用演示:
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
rc = redis.Redis(connection_pool=pool)
@redis_lock(rc, keys=['test1', 'test2'])
def deploy(*args, **kwargs):
from apps.pipeline.actions.deploy import deploy
return deploy(*args, **kwargs)