Redisson源码剖析

9. Redisson源码剖析-读写锁

2020-10-31  本文已影响0人  T_log

一、读锁

读写锁的意义:

1, redis分布式锁,主要就是在理解他里面的lua脚本的逻辑,逻辑全部都在lua脚本里,我们只能枚举清楚各种情况下,lua脚本会执行什么逻辑,其实就知道了这个分布式锁的实现原理

  1. 多个客户端同时加读锁,是不会互斥的,多个客户端可以同时加这个读锁,读锁和读锁是不互斥的
  2. 如果有人加了读锁,此时就不能加写锁,任何人都不能加写锁了,读锁和写锁是互斥的
  3. 如果有人加了写锁,此时任何人都不能加写锁了,写锁和写锁也是互斥的
  4. RedissonReadLock是RedissonLock的子类,所以很多逻辑会直接复用父类RedissonLock中的逻辑
  5. 这里的核心逻辑主要有三块、第一个是加读锁的lua脚本的逻辑;第二个是读锁的释放的lua脚本的逻辑;第三个是读锁的wathdog刷新锁key的生存时间的逻辑

代码

代码片段一、


public class Application {


    public static void main(String[] args) throws Exception {
        Config config = new Config();
        // 自己本地的Redis集群,直接写死了,主要是研究redisson的源码,配置之类的可以弱化
        config.useClusterServers()
                .addNodeAddress("redis://192.168.0.107:7001")
                .addNodeAddress("redis://192.168.0.107:7002")
                .addNodeAddress("redis://192.168.0.110:7003")
                .addNodeAddress("redis://192.168.0.110:7004")
                .addNodeAddress("redis://192.168.0.111:7005")
                .addNodeAddress("redis://192.168.0.111:7006");

        RedissonClient redisson = Redisson.create(config);

        RReadWriteLock rwLock = redisson.getReadWriteLock("anyRWLock");
        // 代码片段
        rwLock.readLock().lock();
        rwLock.readLock().unlock();
        rwLock.writeLock().lock();
        rwLock.writeLock().unlock();

    }


}

代码片段二、


// RedissonLock类中
@Override
public void lock() {
    try {
        // 向下看
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}


@Override
public void lockInterruptibly() throws InterruptedException {
    // 参数-1和null
    lockInterruptibly(-1, null);
}

// leaseTime = -1,unit=null
 @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 当前加锁客户端的线程ID
        long threadId = Thread.currentThread().getId();
        // 实际的加锁逻辑
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }


        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);


        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }


                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }


private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}


private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // tryLockInnerAsync会到RedissonReadLock类中,参数分别是30000毫秒,为超时时间,线程ID,代码片段三、
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

    // 这里其实就是会有一个监听器,watchdog
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            // 如果加锁失败,直接返回
            if (!future.isSuccess()) {
                return;
            }

            // 读取锁的剩余生存时间
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 代码片段四、
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

代码片段三、

这里其实还是老路子,我们先把lua脚本里面的参数给提取出来,方便在阅读lua脚本的时候,脑海里对redis的命令执行结果有一个快速的认识



KEYS[1] = “anyRWLock”

KEYS[2] = “{anyRWLock}:UUID_01:threadId_01:rwlock_timeout”

ARGV[1]=30000毫秒

ARGV[2] = UUID_01:threadId_01

ARGV[3]=UUID_01:threadId_01:write 

@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);


    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                            // hget anyRWLock mode 从anyRWLock这个hash里面获取mode作为key的值,刚开始进来肯定是null
                            "local mode = redis.call('hget', KEYS[1], 'mode'); “ +
                            // if条件成立
                            "if (mode == false) then “ +
                              // hset anyRWLock mode read 此时anyRWLock的hash为 anyRWLock:{“mode”:”read"}
                              "redis.call('hset', KEYS[1], 'mode', 'read'); “ +
                              // hset anyRWLock UUID_01:threadId_01 1 此时anyRWLock:{“mode”:”read”,"UUID_01:threadId_01”:1}
                              "redis.call('hset', KEYS[1], ARGV[2], 1); “ +

                              // set {anyRWLock}:UUID_01:threadId_01:rwlock_timeout ..:1 3000,这里的..其实是字符串拼接的意思,
                              // 实际的结果是{anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
                              "redis.call('set', KEYS[2] .. ':1', 1); “ +
                                //设置{anyLock}:UUID_01:threadId_01:rwlock_timeout:1的过期时间是30000毫秒
                              "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); “ +
                              // 设置anyRWLock的过期时间30000毫秒
                              "redis.call('pexpire', KEYS[1], ARGV[1]); “ +
                              // 返回nil,其实这里上面的调用这段逻辑的函数里,就会开启一个watchdog,会每隔10秒钟去执行一段lua脚本,
                              // 判断一下当前这个线程是否还持有着这个锁,如果还持有锁,更新一下锁key的生存时间为30000毫秒,watchdog主要是保持redis的锁key和java代码中持有的锁是保持同步的
                              "return nil; " +
                            "end; " +
                            "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                              "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                              "local key = KEYS[2] .. ':' .. ind;" +
                              "redis.call('set', key, 1); " +
                              "redis.call('pexpire', key, ARGV[1]); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                            "end;" +
                            "return redis.call('pttl', KEYS[1]);",
                    Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                    internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}

代码片段四、
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }


    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 代码片段五、这个线程延迟10秒后执行
            RFuture<Boolean> future = renewExpirationAsync(threadId);

            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }

                    if (future.getNow()) {
                        // reschedule itself
                        // 调用自己,不停的延长锁的生存时间
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }


    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);


    if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
        task.cancel();
    }
}

代码片段六、

释放锁的逻辑,RedissonReadLock类中

KEYS[1] = anyRWLock

KEYS[2] = {anyRWLock} 

ARGV[1] = 30000毫秒

ARGV[2] = UUID_01:threadId_01

@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // {anyRWLock}:UUID_01:threadId_01:rwlock_timeout
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);


    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 这里判断KEYS[1] mode是否存在以及KEYS[1], ARGV[2]是否存在,条件是不成立的
            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
            "if (mode == false) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
            "if (lockExists == 0) then " +
                "return nil;" +
            "end; " +

            // anyRWLock UUID_01:threadId_01减一
            "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
            "if (counter == 0) then “ +
                // 如果等于0,等于所有的可重入锁都已经释放,这里释放最后一把锁,那么直接删除anyRWLock UUID_01:threadId_01
                "redis.call('hdel', KEYS[1], ARGV[2]); " + 
            "end;” +
            "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

            // hlen anyLock = 2 > 1,就是说,如果你的读锁,anyLock hash内部的key-value对超过了1个,这里肯定是成立的
            "if (redis.call('hlen', KEYS[1]) > 1) then " +
                "local maxRemainTime = -3; " + 
                "local keys = redis.call('hkeys', KEYS[1]); " + 
                "for n, key in ipairs(keys) do “ + 
                    // 加读锁的时候,其实是每个线程都可以加多次这个读锁,读锁也是可重入的,每次同一个线程加多次读锁的时候,他的加锁次数就会加1,counter = 1 ,也可能是 10 、20
                    // 就是遍历counter -> 1,每次递减1,假设counter = 10,10,9,8,7,6,5,4,3,2,1,
                    "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                    "if type(counter) == 'number' then " + 
                        "for i=counter, 1, -1 do " + 
                            "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                            "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                        "end; " + 
                    "end; " + 
                "end; " +

                "if maxRemainTime > 0 then " +
                    "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                    "return 0; " +
                "end;" + 

                "if mode == 'write' then " + 
                    "return 0;" + 
                "end; " +
            "end; " +

            // 删除anyRWLock
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; ",
            Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
            LockPubSub.unlockMessage, getLockName(threadId));
}

二、写锁

其实写锁和读锁的Java代码类似,只是lua脚本略有不同,所以,这里直接分析lua脚本了

KEYS[1] = anyRWLock 

ARGV[1] = 30000

ARGV[2] = UUID_01:threadId_01:write

代码片段一、RedissonWriteLock

@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);


    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                        // hget anyRWLock mode此时肯定获取的是空的
                        "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                        "if (mode == false) then “ +
                              // hset anyRWLock mode write向anyRWLock model 的hash中写入write ,即:anyRWLock:{“mode”:”write"}
                              "redis.call('hset', KEYS[1], 'mode', 'write'); “ +
                              // hset anyRWLock  UUID_01:threadId_01:write,即:anyRWLock:{“mode”:”write”,“UUID_01:threadId_01”:“write”}
                              "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                              // pexpire anyRWLock 30000设置anyRWLock的生存时间
                              "redis.call('pexpire', KEYS[1], ARGV[1]); “ +
                              // 返回nil,说明加锁成功了,Java逻辑拿到加锁成功标志后,watchdog其实就是用的RedissonLock中的逻辑
                              "return nil; " +
                          "end; " +
                          "if (mode == 'write') then " +
                              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                  "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                  "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                  "return nil; " +
                              "end; " +
                            "end;" +
                            "return redis.call('pttl', KEYS[1]);",
                    Arrays.<Object>asList(getName()), 
                    internalLockLeaseTime, getLockName(threadId));
}
本地Redis-cluster
上一篇下一篇

猜你喜欢

热点阅读