python 基于本地文件和reids的同步锁

2021-02-03  本文已影响0人  逐风细雨

进程和线程间同步操作加锁,是常规套路。但对于不同的父进程时,就需要依赖一下外部组件的帮忙了,如内存映射,reids,db,mq,本地文件等等。项目上,对于并发测任务执行时,资源存在互斥,因此想着实现一个轻量化的不同进程间可以使用的同步锁。其中文件实现的只对本地调用起作用,基于redis的,支持能房屋redis的所有机器上面实现分布式同步锁(推荐使用)
代码如下:

#! coding:utf8
import time
import random
import os
import redis


class sync_file_lock:
    """
    基于本地文件的同锁
    lock = sync_file_lock("123", time_out=3)
    with lock:
    print(time.ctime())
    """

    def __init__(self, lock_name="lock.txt", time_out=60):
        # 锁文件名
        self.lock_name = lock_name
        # 锁超时的时间
        self.time_out = time_out
        self.lock_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.lock_name)

    def __check_file_timeout(self):
        """
        检查锁文件是否已过期
        return True/False
        """
        if time.time() - os.path.getmtime(self.lock_path) > self.time_out:
            return True
        else:
            return False

    def __create_lock(self):
        """
        创建锁文件
        1.指定self.log_path 文件不存在,创建文件;
        2.文件存在时,判断文件是否已过期,过期后创建文件,过期逻辑:当前时间-创建时间 > 超时时间;
        3.文件未过期时,在0.1-1.1秒随机等待后,重复1,2的步骤
        """
        if os.path.isfile(self.lock_path):
            # 文件未过期时递归等待
            if self.__check_file_timeout() is False:
                time.sleep(random.random() + 0.1)
                self.__create_lock()

        with open(self.lock_path, "w+") as f:
            f.write(str(self.lock_name))

    def acquire(self):
        """
        取得锁
        1.锁文件不存在;
        2.锁文件已过期:当前时间-创建时间 > 超时时间
        :return:
        """
        self.__create_lock()

    def release(self):
        """
        释放锁
        :return:
        """
        if os.path.isfile(self.lock_path):
            os.remove(self.lock_path)

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()


class sync_redis_lock():
    """
    基于redis的分布式同步锁
    安装redis库 pip install redis
    eg:
    # 创建redis客户端
    client = sync_redis_lock.redis_cli(host="10.2.6.16", password="123456")
    # 实例化分布式锁
    lock = sync_redis_lock(client, lock_name="123", time_out=5)
    with lock:
        print(time.ctime())

    """

    def __init__(self, client, lock_name="test_reds_lock", time_out=60):
        """
        :param client: redis.Redis 对象 eg :
        client = redis.Redis(host="127.0.0.1",port=6379,password="123456,db=0,socket_timeout=30, decode_responses=True)
        或者 client = sync_redis_lock.redis_cli(host="127.0.0.1",port=6379,password="123456,db=0,socket_timeout=30, decode_responses=True)
        :param lock_name:
        :param time_out:
        """
        self.client = client
        self.lock_name = lock_name
        self.time_out = time_out

    @staticmethod
    def redis_cli(host=None, port=6379, password=None, dbname=0, socket_timeout=30, decode_responses=True, **kwargs):
        """
        实例化一个redis客户端
        :param host:
        :param port:
        :param password:
        :param db:
        :param socket_timeout:
        :param decode_responses:
        :return:
        """
        try:
            redis_cli = redis.Redis(host=host,
                                    port=port,
                                    password=password,
                                    db=dbname,
                                    socket_timeout=socket_timeout,
                                    decode_responses=decode_responses,
                                    **kwargs)
            return redis_cli
        except Exception as e:
            print(f"redis 连接异常:{e}")
            raise e

    def __create_lock_key(self):
        """
        创建一个过期带过期时间的key
        1.判断lock_key是否存在,不存在时直接创建;
        2.lock_key存在时,随机等待0.1-1.1秒中,指导key过期;
        :return:
        """
        if self.client.exists(self.lock_name):
            time.sleep(random.random() + 0.1)
            self.__create_lock_key()
        self.client.set(self.lock_name, self.lock_name)
        self.client.expire(self.lock_name, self.time_out)

    def acquire(self):
        self.__create_lock_key()

    def release(self):
        self.client.delete(self.lock_name)

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

上一篇 下一篇

猜你喜欢

热点阅读