What is SoftWare TestPython

Python基于Redis实现分布式锁

2021-07-14  本文已影响0人  Rethink

> redis-server --version
Redis server v=3.0.504 sha=00000000:0 malloc=jemalloc-3.6.0 bits=64 build=a4f7a6e86f2d60b3

Redis中的事务处理

Redis中的事务可以视为一个队列,使用MULTI命令标记事务的开始,接下来客户端提交的命令,服务器都不会立即执行,而是将其压入队列,并返回QUEUED,表示已入队。当输入EXEC命令时,触发当前事务的执行(FIFO),并且在执行事务的过程中不会被客户端发送的其它命令所中断。下面是一个在Redis中使用事务的简单示例:

> MULTI  # 标记事务开始
OK
> SET Book_Name "GIt Pro"    # 多条命令按顺序入队
QUEUED
> SADD Program_Language "C++" "C#" "Jave" "Python"
QUEUED
> GET Book_Name
QUEUED
> EXEC  # 执行
OK
4
GIt Pro

并不是所有的命令都会被放进事务队列, 其中的例外就是 EXECDISCARDMULTIWATCH 这四个命令 。 当这四个命令从客户端发送到服务器时, 它们会像客户端处于非事务状态一样, 直接被服务器执行。

Redis事务命令

命令 描述
MULTI 标记一个事务块的开始
EXEC 执行事务块内的所有命令
DISCARD 取消事务,清空事务队列,如果正在使用 WATCH 命令监视某个(或某些) key,那么取消所有监视,等同于执行命令 UNWATCH
WATCH 监视一个或多个KEY,如果在事务执行之前KEY被其他命令所改动,那么事务将会被打断
UNWATCH 取消WATCH命令对所有KEY的监视

使用案例:

  1. 放弃事务;
> SET Name Rethink
OK
> MULTI
OK
> SET Name Foo
QUEUED
> DISCARD   -- (放弃事务)
OK
> EXEC
ERR EXEC without MULTI
> GET Name
Rethink    -- (可以看到,数据未被改动)
  1. 命令在入队时发生错误,Redis将在客户端调用EXEC命令时拒绝执行并取消事务;
> SET K1 V1
OK
> MULTI
OK
> SET K1 V2
QUEUED
> GET_  K1
ERR unknown command 'get_'
> EXEC  -- (自动取消事务)
EXECABORT Transaction discarded because of previous errors.
> GET K1
V1
  1. EXEC命令执行后发生的错误,Redis将选择自动忽略,而不是事务回滚,实际上,Redis本身是不支持事务回滚机制的,后面会再细说。
> SET Version 1.0
OK
> MULTI
OK
> INCR Version  -- (非事务模式下, 执行此命令会报错)
QUEUED
> GET Version
QUEUED
> EXEC
1.0  -- (自动忽略了错误)
  1. 使用watch监视
    WATCH命令用于在事务开始之前监视任意数量的键,当调用EXEC命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。
> SET Balance 100
OK
> WATCH Balance
OK
> MULTI
OK
> DECRBY Balance 20
QUEUED
> INCRBY Balance 20
QUEUED
> EXEC
(nil)   -- (事务不执行, 直接返回失败)

WATCH命令实现原理

在每个代表数据库的 redis.h/redisDb 结构类型中, 都保存了一个 watched_keys 字典, 字典的键是这个数据库被监视的键, 而字典的值则是一个链表, 链表中保存了所有监视这个键的客户端。
比如说,以下字典就展示了一个 watched_keys 字典的例子:


其中, 键 key1 正在被 client2client5client1 三个客户端监视, 其他一些键也分别被其他别的客户端监视着。

WATCH 命令的作用, 就是将当前客户端和要监视的键在 watched_keys 中进行关联。通过watched_keys字典, 如果程序想检查某个键是否被监视, 那么它只要检查字典中是否存在这个键即可; 如果程序要获取监视某个键的所有客户端, 那么只要取出键的值(一个链表), 然后对链表进行遍历即可。

watch的触发
在任何对数据库键空间(key space)进行修改的命令成功执行之后 (比如FLUSHDB, SET, LPUSH, SADD, ZREM,诸如此类),multi.c/touchWatchedKey函数都会被调用 —— 它检查数据库的watched_keys字典, 看是否有客户端在监视已经被命令修改的键, 如果有的话, 程序将所有监视这个/这些被修改键的客户端的REDIS_DIRTY_CAS选项打开:


当客户端发送EXEC命令触发事务执行时, 服务器会对客户端的状态进行检查:

Redis事务的ACID特性

python基于redis实现分布式锁

分布式锁应该具备的条件:

示例代码如下:

import uuid
import time
import math

def acquire_lock(cli, lockname, acquire_timeout=3, lock_timeoout=2):
    """获取锁
    @param cli:   Redis实例
    @param lockname:   锁名称
    @param acquire_timeout: 客户端获取锁的超时时间(秒), 默认3s
    @param lock_timeout: 锁过期时间(秒), 超过这个时间锁会自动释放, 默认2s
    """
    lockname = f"lock:{lockname}"
    identifier = str(uuid.uuid4())
    lock_timeoout = math.ceil(lock_timeoout)

    end_time = time.time() + acquire_timeout

    while time.time() <= end_time:
        # 如果不存在当前锁, 则进行加锁并设置过期时间, 返回锁唯一标识
        if cli.set(lockname, identifier, ex=lock_timeoout, nx=True):  # 一条命令实现, 保证原子性
            return identifier
        # 如果锁存在但是没有失效时间, 则进行设置, 避免出现死锁
        elif cli.ttl(lockname) == -1:
            cli.expire(lockname, lock_timeoout)

        time.sleep(0.001)

    # 客户端在超时时间内没有获取到锁, 返回False
    return False


def release_lock(cli, lockname, identifier):
    """释放锁
    @param cli: Redis实例
    @param lock_name:   锁名称
    @param identifier:  锁标识
    """
    with cli.pipeline() as pipe:
        lockname = f"lock:{lockname}"
        while True:
            try:
                pipe.watch(lockname)
                id = pipe.get(lockname)
                if id and id == identifier:
                    pipe.multi()
                    pipe.delete(lockname)
                    pipe.execute()    # 执行EXEC命令后自动执行UNWATCH (DISCARD同理)
                    return True
                pipe.unwatch()  # 没有参数
                break
            except redis.WatchError:
                pass
        return False

加锁过程

  1. 首先需要为锁生成一个唯一标识identifier;
  2. 然后使用redis set 命令设置锁,从 v2.6.12 版本开始,set命令支持nxex参数,具体内容可点击进行查看;如果锁之前不存在,则加锁成功,并设置锁的过期时间,返回锁唯一标识;
  3. 如果锁设置失败,则先判断一下该锁是否有过期时间,如果没有则进行设置;其实这一步可以省略,因为redis的命令都是原子性的。

解锁过程

  1. 首先整个解锁操作需要在一个 Redis 的事务中进行,python 中 redis 事务是通过pipeline的封装实现的;
  2. 使用WATCH 监听锁,防止在解锁时出现锁被其他客户端修改;
  3. 查询锁对应的标识是否与本次解锁的标识相同;
  4. 如果标识相同,则在事务中删除锁,如果删除过程中锁自动失效又被其他客户端拿到(即锁标识被其他客户端修改),此时设置了 WATCH 就会删除失败,这样就不会出现删除了其他客户端锁的情况。

下面对刚才实现的分布式锁进行测试,使用50个线程,模拟秒杀10张票,从结果的有序性可以看出是否为加锁状态,代码如下:

from threading import Thread

import redis

# Redis 存字符串返回的是byte,指定decode_responses=True可以解决
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
redis_cli = redis.Redis(connection_pool=pool)

count = 10


def ticket(i):
    identifier = acquire_lock(redis_cli, 'Ticket')
    print(f"线程{i}--获得了锁")
    time.sleep(1)
    global count
    if count < 1:
        print(f"线程{i}没抢到票, 票已经抢完了")
        return
    count -= 1
    print(f"线程{i}抢到票了, 还剩{count}张票")
    release_lock(redis_cli, 'Resource', identifier)
    print(f"线程{i}--释放了锁")


for i in range(10):
    t = Thread(target=ticket, args=(i, ))
    t.start()

输出如下:

线程1--获得了锁
线程1抢到票了, 还剩4张票
线程1--释放了锁
线程2--获得了锁
线程4--获得了锁
线程3--获得了锁 
线程0--获得了锁 
线程6--获得了锁 
线程7--获得了锁 
线程8--获得了锁 
线程5--获得了锁 
线程9--获得了锁 
线程11--获得了锁
线程10--获得了锁
线程13--获得了锁
线程2抢到票了, 还剩3张票
线程2--释放了锁
线程14--获得了锁
线程12--获得了锁
线程15--获得了锁
线程16--获得了锁
线程17--获得了锁
线程19--获得了锁
线程18--获得了锁
线程4抢到票了, 还剩2张票
线程4--释放了锁
线程3抢到票了, 还剩1张票
线程0抢到票了, 还剩0张票
线程8没抢到票, 票已经抢完了
线程6没抢到票, 票已经抢完了
线程7没抢到票, 票已经抢完了
线程3--释放了锁
线程5没抢到票, 票已经抢完了
线程0--释放了锁
线程9没抢到票, 票已经抢完了
线程11没抢到票, 票已经抢完了
线程10没抢到票, 票已经抢完了
线程13没抢到票, 票已经抢完了
线程14没抢到票, 票已经抢完了
线程12没抢到票, 票已经抢完了
线程15没抢到票, 票已经抢完了
线程16没抢到票, 票已经抢完了
线程17没抢到票, 票已经抢完了
线程19没抢到票, 票已经抢完了
线程18没抢到票, 票已经抢完了

参考文档

  1. 数据库事务的概念及其实现原理, takumiCX
  2. 深入理解Redis事务
  3. Python 使用 Redis 实现分布式锁WoodenRobot
  4. python基于redis实现分布式锁,Maple_feng
  5. Redis 命令参考

【To Be Continued...】

上一篇下一篇

猜你喜欢

热点阅读