python 基于本地文件和reids的同步锁
2021-02-03 本文已影响0人
逐风细雨
进程和线程间同步操作加锁,是常规套路。但对于不同的父进程时,就需要依赖一下外部组件的帮忙了,如内存映射,reids,db,mq,本地文件等等。项目上,对于并发测任务执行时,资源存在互斥,因此想着实现一个轻量化的不同进程间可以使用的同步锁。其中文件实现的只对本地调用起作用,基于redis的,支持能房屋redis的所有机器上面实现分布式同步锁(推荐使用)
代码如下:
#! coding:utf8
import time
import random
import os
import redis
class sync_file_lock:
"""
基于本地文件的同锁
lock = sync_file_lock("123", time_out=3)
with lock:
print(time.ctime())
"""
def __init__(self, lock_name="lock.txt", time_out=60):
# 锁文件名
self.lock_name = lock_name
# 锁超时的时间
self.time_out = time_out
self.lock_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.lock_name)
def __check_file_timeout(self):
"""
检查锁文件是否已过期
return True/False
"""
if time.time() - os.path.getmtime(self.lock_path) > self.time_out:
return True
else:
return False
def __create_lock(self):
"""
创建锁文件
1.指定self.log_path 文件不存在,创建文件;
2.文件存在时,判断文件是否已过期,过期后创建文件,过期逻辑:当前时间-创建时间 > 超时时间;
3.文件未过期时,在0.1-1.1秒随机等待后,重复1,2的步骤
"""
if os.path.isfile(self.lock_path):
# 文件未过期时递归等待
if self.__check_file_timeout() is False:
time.sleep(random.random() + 0.1)
self.__create_lock()
with open(self.lock_path, "w+") as f:
f.write(str(self.lock_name))
def acquire(self):
"""
取得锁
1.锁文件不存在;
2.锁文件已过期:当前时间-创建时间 > 超时时间
:return:
"""
self.__create_lock()
def release(self):
"""
释放锁
:return:
"""
if os.path.isfile(self.lock_path):
os.remove(self.lock_path)
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
class sync_redis_lock():
"""
基于redis的分布式同步锁
安装redis库 pip install redis
eg:
# 创建redis客户端
client = sync_redis_lock.redis_cli(host="10.2.6.16", password="123456")
# 实例化分布式锁
lock = sync_redis_lock(client, lock_name="123", time_out=5)
with lock:
print(time.ctime())
"""
def __init__(self, client, lock_name="test_reds_lock", time_out=60):
"""
:param client: redis.Redis 对象 eg :
client = redis.Redis(host="127.0.0.1",port=6379,password="123456,db=0,socket_timeout=30, decode_responses=True)
或者 client = sync_redis_lock.redis_cli(host="127.0.0.1",port=6379,password="123456,db=0,socket_timeout=30, decode_responses=True)
:param lock_name:
:param time_out:
"""
self.client = client
self.lock_name = lock_name
self.time_out = time_out
@staticmethod
def redis_cli(host=None, port=6379, password=None, dbname=0, socket_timeout=30, decode_responses=True, **kwargs):
"""
实例化一个redis客户端
:param host:
:param port:
:param password:
:param db:
:param socket_timeout:
:param decode_responses:
:return:
"""
try:
redis_cli = redis.Redis(host=host,
port=port,
password=password,
db=dbname,
socket_timeout=socket_timeout,
decode_responses=decode_responses,
**kwargs)
return redis_cli
except Exception as e:
print(f"redis 连接异常:{e}")
raise e
def __create_lock_key(self):
"""
创建一个过期带过期时间的key
1.判断lock_key是否存在,不存在时直接创建;
2.lock_key存在时,随机等待0.1-1.1秒中,指导key过期;
:return:
"""
if self.client.exists(self.lock_name):
time.sleep(random.random() + 0.1)
self.__create_lock_key()
self.client.set(self.lock_name, self.lock_name)
self.client.expire(self.lock_name, self.time_out)
def acquire(self):
self.__create_lock_key()
def release(self):
self.client.delete(self.lock_name)
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()