分布式锁

2021-11-16  本文已影响0人  markfork

章节

1 应用场景

分布式系统对共享资源的写操作(update) 需要保证数据的一致性、安全性更新

2 分布式锁

分布式锁具备的条件

3 分布式锁类型

3种方式:

3.1 DB

方法 加唯一索引,调用写入记录,调用完删除

问题:
性能、容灾、不具备可重入、锁失效机制需要引入新的机制

3.2 基于Redis

setnx + expire + delete

基本命令:

setnx key val(random uuid)

思想:

#连接redis
redis_client = redis.Redis(host="localhost",
                           port=6379,
                           password=password,
                           db=10)

#获取一个锁
lock_name:锁定名称
acquire_time: 客户端等待获取锁的时间
time_out: 锁的超时时间
def acquire_lock(lock_name, acquire_time=10, time_out=10):
    """获取一个分布式锁"""
    identifier = str(uuid.uuid4())
    end = time.time() + acquire_time
    lock = "string:lock:" + lock_name
    while time.time() < end:
        if redis_client.setnx(lock, identifier):
            # 给锁设置超时时间, 防止进程崩溃导致其他进程无法获取锁
            redis_client.expire(lock, time_out)
            return identifier
        elif not redis_client.ttl(lock):
            redis_client.expire(lock, time_out)
        time.sleep(0.001)
    return False

#释放一个锁
def release_lock(lock_name, identifier):
    """通用的锁释放函数"""
    lock = "string:lock:" + lock_name
    pip = redis_client.pipeline(True)
    while True:
        try:
            pip.watch(lock)
            lock_value = redis_client.get(lock)
            if not lock_value:
                return True

            if lock_value.decode() == identifier:
                pip.multi()
                pip.delete(lock)
                pip.execute()
                return True
            pip.unwatch()
            break
        except redis.excetions.WacthcError:
            pass
    return False

测试

#连接redis
redis_client = redis.Redis(host="localhost",
                           port=6379,
                           password=password,
                           db=10)

#获取一个锁
lock_name:锁定名称
acquire_time: 客户端等待获取锁的时间
time_out: 锁的超时时间
def acquire_lock(lock_name, acquire_time=10, time_out=10):
    """获取一个分布式锁"""
    identifier = str(uuid.uuid4())
    end = time.time() + acquire_time
    lock = "string:lock:" + lock_name
    while time.time() < end:
        if redis_client.setnx(lock, identifier):
            # 给锁设置超时时间, 防止进程崩溃导致其他进程无法获取锁
            redis_client.expire(lock, time_out)
            return identifier
        elif not redis_client.ttl(lock):
            redis_client.expire(lock, time_out)
        time.sleep(0.001)
    return False

#释放一个锁
def release_lock(lock_name, identifier):
    """通用的锁释放函数"""
    lock = "string:lock:" + lock_name
    pip = redis_client.pipeline(True)
    while True:
        try:
            pip.watch(lock)
            lock_value = redis_client.get(lock)
            if not lock_value:
                return True

            if lock_value.decode() == identifier:
                pip.multi()
                pip.delete(lock)
                pip.execute()
                return True
            pip.unwatch()
            break
        except redis.excetions.WacthcError:
            pass
    return False

3.3 基于Zookeeper

关键词: 临时顺序节点
加锁: currentThread 创建节点是所有兄弟节点中最小的
解锁: currentThread 删除自己创建的节点

zookeeper 实现分布式锁的步骤:

上一篇下一篇

猜你喜欢

热点阅读