Redisson 分布式锁实现分析
转自:https://github.com/angryz/my-blog/issues/4
Why 分布式锁
java.util.concurrent.locks
中包含了 JDK 提供的在多线程情况下对共享资源的访问控制的一系列工具,它们可以帮助我们解决进程内多线程并发时的数据一致性问题。
但是在分布式系统中,JDK 原生的并发锁工具在一些场景就无法满足我们的要求了,这就是为什么要使用分布式锁。我总结了一句话,分布式锁是用于解决分布式系统中操作共享资源时的数据一致性问题。
设计分布式锁要注意的问题
互斥
分布式系统中运行着多个节点,必须确保在同一时刻只能有一个节点的一个线程获得锁,这是最基本的一点。
死锁
分布式系统中,可能产生死锁的情况要相对复杂一些。分布式系统是处在复杂网络环境中的,当一个节点获取到锁,如果它在释放锁之前挂掉了,或者因网络故障无法执行释放锁的命令,都会导致其他节点无法申请到锁。
因此分布式锁有必要设置时效,确保在未来的一定时间内,无论获得锁的节点发生了什么问题,最终锁都能被释放掉。
性能
对于访问量大的共享资源,如果针对其获取锁时造成长时间的等待,导致大量节点阻塞,是绝对不能接受的。
所以设计分布式锁时要能够掌握锁持有者的动态,若判断锁持有者处于不活动状态,要能够强制释放其持有的锁。
此外,排队等待锁的节点如果不知道锁何时会被释放,则只能隔一段时间尝试获取一次锁,这样无法保证资源的高效利用,因此当锁释放时,要能够通知等待队列,使一个等待节点能够立刻获得锁。
重入
考虑到一些应用场景和资源的高效利用,锁要设计成可重入的,就像 JDK 中的 ReentrantLock 一样,同一个线程可以重复拿到同一个资源的锁。
RedissonLock 实现解读
本文中 Redisson 的代码版本为 2.2.17-SNAPSHOT。
这里以 lock()
方法为例,其他一系列方法与其核心实现基本一致。
先来看 lock() 的基本用法
RLock lock = redisson.getLock("foobar"); // 1.获得锁对象实例
lock.lock(); // 2.获取分布式锁
try {
// do sth.
} finally {
lock.unlock(); // 3.释放锁
}
- 通过 RedissonClient 的
getLock()
方法取得一个 RLock 实例。 -
lock()
方法尝试获取锁,如果成功获得锁,则继续往下执行,否则等待锁被释放,然后再继续尝试获取锁,直到成功获得锁。 -
unlock()
方法释放获得的锁,并通知等待的节点锁已释放。
下面来看看 RedissonLock 的具体实现
org.redisson.Redisson#getLock()
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name, id);
}
这里的 RLock 是继承自 java.util.concurrent.locks.Lock 的一个 interface,getLock
返回的实际上是其实现类 RedissonLock 的实例。
来看看构造 RedissonLock 的参数
-
commandExecutor: 与 Redis 节点通信并发送指令的真正实现。需要说明一下,Redisson 缺省的 CommandExecutor 实现是通过
eval
命令来执行 Lua 脚本,所以要求 Redis 的版本必须为 2.6 或以上,否则你可能要自己来实现 CommandExecutor。关于 Redisson 的 CommandExecutor 以后会专门解读,所以本次就不多说了。 -
name: 锁的全局名称,例如上面代码中的
"foobar"
,具体业务中通常可能使用共享资源的唯一标识作为该名称。 - id: Redisson 客户端唯一标识,实际上就是一个 UUID.randomUUID()。
org.redisson.RedissonLock#lock()
此处略过前面几个方法的层层调用,直接看最核心部分的方法 lockInterruptibly()
,该方法在 RLock 中声明,支持对获取锁的线程进行中断操作。在直接使用 lock()
方法获取锁时,最后实际执行的是 lockInterruptibly(-1, null)
。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 1.尝试获取锁
Long ttl = tryAcquire(leaseTime, unit);
// 2.获得锁成功
if (ttl == null) {
return;
}
// 3.等待锁释放,并订阅锁
long threadId = Thread.currentThread().getId();
Future<RedissonLockEntry> future = subscribe(threadId);
get(future);
try {
while (true) {
// 4.重试获取锁
ttl = tryAcquire(leaseTime, unit);
// 5.成功获得锁
if (ttl == null) {
break;
}
// 6.等待锁释放
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 7.取消订阅
unsubscribe(future, threadId);
}
}
- 首先尝试获取锁,具体代码下面再看,返回结果是已存在的锁的剩余存活时间,为
null
则说明没有已存在的锁并成功获得锁。 - 如果获得锁则结束流程,回去执行业务逻辑。
- 如果没有获得锁,则需等待锁被释放,并通过 Redis 的 channel 订阅锁释放的消息,这里的具体实现本文也不深入,只是简单提一下 Redisson 在执行 Redis 命令时提供了同步和异步的两种实现,但实际上同步的实现都是基于异步的,具体做法是使用 Netty 中的异步工具 Future 和 FutureListener 结合 JDK 中的 CountDownLatch 一起实现。
- 订阅锁的释放消息成功后,进入一个不断重试获取锁的循环,循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。
- 如果在重试中拿到了锁,则结束循环,跳过第 6 步。
- 如果锁当前是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 并发的信号量工具 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的
release()
方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。 - 在成功获得锁后,就没必要继续订阅锁的释放消息了,因此要取消对 Redis 上相应 channel 的订阅。
下面着重看看 tryAcquire()
方法的实现,
private Long tryAcquire(long leaseTime, TimeUnit unit) {
// 1.将异步执行的结果以同步的形式返回
return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
}
private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 2.用默认的锁超时时间去获取锁
Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// 成功获得锁
if (ttlRemaining == null) {
// 3.锁过期时间刷新任务调度
scheduleExpirationRenewal();
}
}
});
return ttlRemainingFuture;
}
<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,
RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 4.使用 EVAL 命令执行 Lua 脚本获取锁
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime,
getLockName(threadId));
}
- 上面说过 Redisson 实现的执行 Redis 命令都是异步的,但是它在异步的基础上提供了以同步的方式获得执行结果的封装。
- 前面提到分布式锁要确保未来的一段时间内锁一定能够被释放,因此要对锁设置超时释放的时间,在我们没有指定该时间的情况下,Redisson 默认指定为30秒。
- 在成功获取到锁的情况下,为了避免业务中对共享资源的操作还未完成,锁就被释放掉了,需要定期(锁失效时间的三分之一)刷新锁失效的时间,这里 Redisson 使用了 Netty 的 TimerTask、Timeout 工具来实现该任务调度。
- 获取锁真正执行的命令,Redisson 使用
EVAL
命令执行上面的 Lua 脚本来完成获取锁的操作:- 如果通过
exists
命令发现当前 key 不存在,即锁没被占用,则执行hset
写入 Hash 类型数据 key:全局锁名称(例如共享资源ID), field:锁实例名称(Redisson客户端ID:线程ID), value:1,并执行pexpire
对该 key 设置失效时间,返回空值nil
,至此获取锁成功。 - 如果通过
hexists
命令发现 Redis 中已经存在当前 key 和 field 的 Hash 数据,说明当前线程之前已经获取到锁,因为这里的锁是可重入的,则执行hincrby
对当前 key field 的值加一,并重新设置失效时间,返回空值,至此重入获取锁成功。 - 最后是锁已被占用的情况,即当前 key 已经存在,但是 Hash 中的 Field 与当前值不同,则执行
pttl
获取锁的剩余存活时间并返回,至此获取锁失败。
- 如果通过
以上就是对 lock()
的解读,不过在实际业务中我们可能还会经常使用 tryLock()
,虽然两者有一定差别,但核心部分的实现都是相同的,另外还有其他一些方法可以支持更多自定义参数,本文中就不一一详述了。
org.redisson.RedissonLock#unlock()
最后来看锁的释放,
@Override
public void unlock() {
// 1.通过 EVAL 和 Lua 脚本执行 Redis 命令释放锁
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime,
getLockName(Thread.currentThread().getId()));
// 2.非锁的持有者释放锁时抛出异常
if (opStatus == null) {
throw new IllegalMonitorStateException(
"attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
// 3.释放锁后取消刷新锁失效时间的调度任务
if (opStatus) {
cancelExpirationRenewal();
}
}
- 使用
EVAL
命令执行 Lua 脚本来释放锁:- key 不存在,说明锁已释放,直接执行
publish
命令发布释放锁消息并返回1
。 - key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回
nil
。 - 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行
hincrby
对锁的值减一。 - 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回
0
;如果刚才释放的已经是最后一把锁,则执行del
命令删除锁的 key,并发布锁释放消息,返回1
。
- key 不存在,说明锁已释放,直接执行
- 上面执行结果返回
nil
的情况(即第2中情况),因为自己不是锁的持有者,不允许释放别人的锁,故抛出异常。 - 执行结果返回
1
的情况,该锁的所有实例都已全部释放,所以不需要再刷新锁的失效时间。
总结
写了这么多,其实最主要的就是上面的两段 Lua 脚本,基于 Redis 的分布式锁的设计完全体现在其中,看完这两段脚本,再回顾一下前面的 设计分布式锁要注意的问题 就豁然开朗了。
redisson是redis官网推荐的java语言实现分布式锁的项目。当然,redisson远不止分布式锁,还包括其他一些分布式结构。详情请移步:https://github.com/mrniko/redisson/wiki
redisson支持4种链接redis的方式:
Cluster(集群)
Sentinel servers(哨兵)
Single server(单机)
分布式锁之redisson
redisson是redis官网推荐的java语言实现分布式锁的项目。当然,redisson远不止分布式锁,还包括其他一些分布式结构。详情请移步:https://github.com/mrniko/redisson/wiki
redisson支持4种链接redis的方式:
Cluster(集群)
Sentinel servers(哨兵)
Single server(单机)
下面通过简单的案例使用redisson的lock。
1、RedissonManager类,管理redisson的初始化等操作。
public class RedissonManager {
private static final String RAtomicName = "genId_";
private static Config config = new Config();
private static Redisson redisson = null;
public static void init(){
try {
config.useClusterServers() //这是用的集群server
.setScanInterval(2000) //设置集群状态扫描时间
.setMasterConnectionPoolSize(10000) //设置连接数
.setSlaveConnectionPoolSize(10000)
.addNodeAddress("127.0.0.1:6379","127.0.0.1:6380");
redisson = Redisson.create(config);
//清空自增的ID数字
RAtomicLong atomicLong = redisson.getAtomicLong(RAtomicName);
atomicLong.set(1);
}catch (Exception e){
e.printStackTrace();
}
}
public static Redisson getRedisson(){
return redisson;
}
/** 获取redis中的原子ID */
public static Long nextID(){
RAtomicLong atomicLong = getRedisson().getAtomicLong(RAtomicName);
atomicLong.incrementAndGet();
return atomicLong.get();
}
}
2、DistributedRedisLock类,提供锁和解锁方法
public class DistributedRedisLock {
private static Redisson redisson = RedissonManager.getRedisson();
private static final String LOCK_TITLE = "redisLock_";
public static void acquire(String lockName){
String key = LOCK_TITLE + lockName;
RLock mylock = redisson.getLock(key);
mylock.lock(2, TimeUnit.MINUTES); //lock提供带timeout参数,timeout结束强制解锁,防止死锁
System.err.println("======lock======"+Thread.currentThread().getName());
}
public static void release(String lockName){
String key = LOCK_TITLE + lockName;
RLock mylock = redisson.getLock(key);
mylock.unlock();
System.err.println("======unlock======"+Thread.currentThread().getName());
}
}
3、测试
private static void redisLock(){
RedissonManager.init(); //初始化
for (int i = 0; i < 100; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
String key = "test123";
DistributedRedisLock.acquire(key);
Thread.sleep(1000); //获得锁之后可以进行相应的处理
System.err.println("======获得锁后进行相应的操作======");
DistributedRedisLock.release(key);
System.err.println("=============================");
} catch (Exception e) {
e.printStackTrace();
}
}
});
t.start();
}
}
4、测试结果
======lock======Thread-91
======获得锁后进行相应的操作======
======unlock======Thread-91
=============================
======lock======Thread-63
======获得锁后进行相应的操作======
======unlock======Thread-63
=============================
======lock======Thread-31
======获得锁后进行相应的操作======
======unlock======Thread-31
=============================
======lock======Thread-97
======获得锁后进行相应的操作======
======unlock======Thread-97
=============================
======lock======Thread-8
======获得锁后进行相应的操作======
======unlock======Thread-8
=============================
从测试结果可以看出,结果还是达到了预期,在服务器跑一万个线程还是能很好运行,感觉还不错。
题外话:
在初始化数据时候,最好不要使用static{} 即静态块。因为在多核机器的情况下读取配置文件,会抛出java.lang.NoClassDefFoundError: Could not initialize class XXX。
所以最好还是使用init的方式,在启动程序的时候手动执行下。