分布式锁的理论与实现
前言
zookeeper实现优于redis的原因:
- redis集群主节点切换时可能导致锁失效,Zookeeper内部的选举机制保证主节点切换时锁不会失效。
- 为了防止主机宕机或网络断开之后的死锁,Redis没有ZK那种天然的实现方式,只能依赖设置超时时间来规避。
- zk可以使用临时顺序节点来上锁,避免惊群效应。
Redisson的特性:
- 互斥性和自解锁:使用hset (UUID+threadId, 1) 保证;
- 不会发生死锁:设置定时事件;
- 锁不能自己失效:通过看门狗功能,使用定时器判断锁失效时间,并延长事件;
- 容错性:使用红锁RedissonRedLock解决。
Zookeeper的特性:
- 互斥性、自解锁和不发生死锁:最小的临时顺序节点拿到锁;
- 容错性:zk的选举机制;
- 锁不能自己失效:只有主动删除和结束会话才会删除节点。
一、基于Redis的分布式锁
1.1 为什么要使用分布式锁
先看两个常见的例子:
例1:某服务记录关键数据X,当前值为100。A请求需要将X增加200;同时,B请求需要将X减100。
在理想的情况下,A先读取到X=100,然后X增加200,最后写入X=300。B请求接着从读取X=300,减少100,最后写入X=200。
然而在真实情况下,如果不做任何处理,则可能会出现:A和B同时读取到X=100;A写入之前B读取到X;B比A先写入等情况。
例2:某服务提供一组任务,A请求随机从任务组中获取一个任务;B请求随机从任务组中获取一个任务。
在理想的情况下,A从任务组中挑选一个任务,任务组删除该任务,B从剩下的的任务中再挑一个,任务组删除该任务。
同样的,在真实情况下,如果不做任何处理,可能会出现A和B挑中了同一个任务的情况。
1.2 需要思考的问题
1.2.1 基础问题
问题1:锁状态判断原子性无法保证
从读取锁的状态,到判断该状态是否为被锁,需要经历两步操作。如果不能保证这两步的原子性,就可能导致不止一个请求获取到了锁,这显然是不行的。因此,我们需要保证锁状态判断的原子性。
问题2:网络断开或主机宕机,锁状态无法清除
假设在主机已经获取到锁的情况下,突然出现了网络断开或者主机宕机,如果不做任何处理该锁将仍然处于被锁定的状态。那么之后所有的请求都无法再成功抢占到这个锁。因此,我们需要在持有锁的主机宕机或者网络断开的时候,及时的释放掉这把锁。
问题3:无法保证释放的是自己上锁的那把锁
在解决了问题2的情况下再设想一下,假设持有锁的主机A在临界区遇到网络抖动导致网络断开,分布式锁及时的释放掉了这把锁。之后,另一个主机B占有了这把锁,但是此时主机A网络恢复,退出临界区时解锁。由于都是同一把锁,所以A就会将B的锁解开。此时如果有第三个主机尝试抢占这把锁,也将会成功获得。因此,我们需要在解锁时,确定自己解的这个锁正是自己锁上的。
1.2.2 进阶条件
如果分布式锁的实现,还能再解决上面的三个问题,那么就可以算是一个相对完整的分布式锁了。然而,在实际的系统环境中,还会对分布式锁有更高级的要求。
- 可重入:线程中的可重入,指的是外层函数获得锁之后,内层也可以获得锁,ReentrantLock和synchronized都是可重入锁;衍生到分布式环境中,一般仍然指的是线程的可重入,在绝大多数分布式环境中,都要求分布式锁是可重入的。
- 惊群效应(Herd Effect):在分布式锁中,惊群效应指的是,在有多个请求等待获取锁的时候,一旦占有锁的线程释放之后,如果所有等待的方都同时被唤醒,尝试抢占锁。但是这样的情况会造成比较大的开销,那么在实现分布式锁的时候,应该尽量避免惊群效应的产生。
- 公平锁和非公平锁:不同的需求,可能需要不同的分布式锁。非公平锁普遍比公平锁开销小。但是业务需求如果必须要锁的竞争者按顺序获得锁,那么就需要实现公平锁。
- 阻塞锁和自旋锁:针对不同的使用场景,阻塞锁和自旋锁的效率也会有所不同。阻塞锁会有上下文切换,如果并发量比较高且临界区的操作耗时比较短,那么造成的性能开销就比较大了。但是如果临界区操作耗时比较长,一直保持自旋,也会对CPU造成更大的负荷。
可靠的分布式锁的特征
- 互斥性:只能有一个线程持有锁;
- 不会发生死锁:即是客户端崩溃没有释放锁,也不会造成其他线程一直拿不到锁
- 具有容错性:大部分redis节点运行正常,客户端就可以正常加解锁
- 解铃还须系铃人:加锁和解锁必须是同一个客户端;
- 锁不能自己失效:正常执行程序中,锁不能因为某些原因失效。
1.3 实现
1.3.1 自定义分布式锁
步骤
- 在执行业务代码前添加上redis操作指令,通过setnx向redis中插入一条k-v值同时设置超时时间,key保持一致,value通过UUID获取唯一值。(注:setnx操作如果存在相同key,则插入失败)
- 如果插入成功,说明没有分布式锁,则进行业务操作
- 最后需要删除分布式锁,直接通过finally进行删除。
伪代码
try{
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey,uuid,10,TimeUnit.SECONDS);
if(!result){
return "锁被其他线程占用,请稍后再试";
}
//TODO 业务代码
} finally {
if(uuid.equals(redisTemplate.opsForValue().get(lockKey))){
redisTemplate.delete(lockKey);
}
}
return result;
注意点
- 需要设置过期时间,防止锁删除失败导致进程阻塞。
- value需要设置为不同值,避免线程一执行时间过长,导致锁过期,然后线程二放入锁但是马上被线程一删除,导致锁永久失效的情况发生。
- 开启线程监控进程启动时间,执行到锁失效时间的1/3时,业务未执行完成,需要为锁重置过期时间。防止进程执行中锁失效问题。
- 可以使用GETSET(先写新值,返回旧值,原子性操作,可以用于分辨是不是首次操作)操作来实现重入锁。
1.3.2 redisson分布式锁框架实现


注意:使用lua脚本执行命令保证操作的原子性。


依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
配置类
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson(){
//此处设置为单机模式
Config config = new Config();
config.useSingleServer().setAddress("redis://116.62.148.11:6380").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
伪代码
//lockKey是hset时的key,每次getLock都会生成一个uuid+threadId作为field。
RLock redissonLock = redisson.getLock(lockKey);
try{
//设置redis锁并设置过期时间
redissonLock.lock(10,TimeUnit.SECONDS);
if(!result){
return "锁被其他线程占用,请稍后再试";
}
//TODO 业务代码
} finally {
// 同一个redissonLock有相同的field,可以解锁
redissonLock.unlock();
}
}
return result;
原理

Redisson的特性:
- 互斥性和自解锁:使用hset (key, UUID+threadId, 1) 保证;
- 不会发生死锁:设置定时事件;
- 锁不能自己失效:通过看门狗功能,使用定时器判断锁失效时间,并延长事件。
拓展
如何将系统的性能提高十倍。
只需要将加锁的数据拆成十份,然后每份可以有一个线程执行业务逻辑。将并发量提高了十倍。
二、基于Zookeeper的分布式锁(Menagerie框架)
zk节点分类(节点都是唯一的)
- 临时节点:创建后一直存在,直到主动删除此节点;
- 临时有序节点:创建后一直存在,直到主动删除此节点;每个父节点会为它的第一级子节点维护一份时序,记录每个子节点创建的先后顺序。
- 持久节点:临时节点在客户端会话失效后节点自动清除。临时节点下面不能创建子节点。
- 持久有序节点:临时节点在客户端会话失效后节点自动清除。临时节点下面不能创建子节点。父节点getChildren会获得顺序的节点列表。
zk事件监听watch
- 节点创建
- 节点删除
- 节点数据修改
- 子节点变更
zk实现分布式锁:

2.1 实现过程
2.1.1 会产生惊群效应的实现
- 使用zk的临时节点,因为临时节点是唯一的。多个线程争抢锁,即去zk上创建相同名字的临时节点。
- 第一个创建的临时节点的线程抢到了锁。开始执行业务。
- 其他线程阻塞,通过watch监听节点的变化。
- 直到执行业务的线程结束后结束与zk的会话,临时节点自动删除,锁被释放。
- 监听到节点删除事件后,所有的线程都会再次竞争这个锁。
缺点:每次释放锁,所有的线程都会重新开始竞争锁,如果等待线程过多,zk的压力会过大。
2.1.2 避免惊群效应的实现
2.1.2.1 概念
- 使用临时有序节点,每个线程创建有序的临时节点。
- 每个线程按顺序创建节点,创建节点之后会判断自己是否是当前最小的节点,如果最小的节点获取到锁,开始执行业务;如果不是则通过watch监听自己的前一个节点。
- 直到执行业务的线程结束后结束与zk的会话,临时节点自动删除。
- 线程监听到前一个节点被删除,则判断它创建的节点是否是最小的,是则获取到锁开始执行业务。
优点:线程释放锁后只会唤醒监听其创建的节点的线程,不会产生惊群效应。


2.1.2.2 实现
依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
代码
public class CuratorDistrLock implements Runnable {
private static final String ZK_ADDRESS = "192.168.32.242:2181";
private static final String ZK_LOCK_PATH = "/zkLock";
static CuratorFramework client;
static {
// 创建连接zk的客户端
client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,
new RetryNTimes(10, 5000));
client.start();
}
private void curatorLockTest() {
// 创建锁对象
InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
try {
// 设置获取锁的超时时间
if (lock.acquire(6 * 1000, TimeUnit.SECONDS)) {
System.out.println("===== " + Thread.currentThread().getName() + "抢到锁 =====");
}
// 执行业务
Thread.sleep(5000);
} catch (Exception e) {
System.out.println("业务异常");
} finally {
try {
lock.release();
System.out.println("===== " + Thread.currentThread().getName() + "释放锁 =====");
} catch (Exception e) {
System.out.println("锁释放异常");
}
}
}
public static void main(String[] args) {
new Thread(new CuratorDistrLock()).start();
new Thread(new CuratorDistrLock()).start();
}
public void run() {
curatorLockTest();
}
}
2.1.2.2 zk锁种类
- InterProcessMutex 分布式可重入排它锁
- InterProcessSemaphoreMutex 分布式排它锁
- InterProcessReadWriteLock 分布式读写锁
- InterProcessMultiLock 用多个锁去进行一组操作
- InterProcessSemaphoreV2 共享信号量
开源的基于ZK的Menagerie框架:
Menagerie中的lock首先实现了可重入锁,利用ThreadLocal存储进入的次数,每次加锁次数加1,每次解锁次数减1。如果判断出是当前线程持有锁,就不用走获取锁的流程。
通过tryAcquireDistributed方法尝试获取锁,循环判断前序节点是否存在,如果存在则监视该节点并且返回获取失败。如果前序节点不存在,则再判断更前一个节点。如果判断出自己是第一个节点,则返回获取成功。
为了在别的线程占有锁的时候阻塞,代码中使用JUC的condition来完成。如果获取尝试锁失败,则进入等待且放弃localLock,等待前序节点唤醒。而localLock是一个本地的公平锁,使得condition可以公平的进行唤醒,配合循环判断前序节点,实现了一个公平锁。关注Java技术栈微信公众号,在后台回复关键字:zookeeper,可以获取更多栈长整理的zk系列技术干货。
这种实现方式非常类似于ReentrantLock的CHL队列,而且zk的临时节点可以直接避免网络断开或主机宕机,锁状态无法清除的问题,顺序节点可以避免惊群效应。这些特性都使得利用ZK实现分布式锁成为了最普遍的方案之一。
三、通过AOP和注解优化代码
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
自定义注解代码
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RlockRepeatSubmit {
/**
* 通过反射来判断使用的场景
* @return
*/
LockConstant lockConstant() default LockConstant.COMMON_LOCK;
}
aop代码
@Configuration
@Aspect
public class RedissonAop {
@Resource
private Redisson redisson;
@Pointcut("@annotation(com.cj.redisson.annotation.RlockRepeatSubmit)")
public void pointCut() {
}
@Around("pointCut()")
public Object redissonLock(ProceedingJoinPoint pjp) {
// 获取方法的注解
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
RlockRepeatSubmit annotation = method.getAnnotation(RlockRepeatSubmit.class);
LockConstant lockConstant = annotation.lockConstant();
Assert.notNull(lockConstant, "服务器异常");
// 获取方法的参数
Object[] args = pjp.getArgs();
String keyPrefix = lockConstant.getKeyPrefix();
int leaseTime = lockConstant.getLeaseTime();
int waitTime = lockConstant.getWaitTime();
TimeUnit timeUnit = lockConstant.getTimeUnit();
String message = lockConstant.getMessage();
RLock lock = redisson.getLock(keyPrefix);
try {
boolean ifLock = lock.tryLock(waitTime, leaseTime, timeUnit);
if (ifLock) {
System.out.println("上锁");
return pjp.proceed();
} else {
throw new IllegalStateException(message);
}
} catch (Throwable throwable) {
throw new IllegalStateException("服务器aop代理异常");
} finally {
System.out.println("解锁");
lock.unlock();
}
}
}
枚举类
public enum LockConstant {
CASHIER("cashierLock:", 8, 10, TimeUnit.SECONDS, "请勿重复操作点击收银操作"),
SUBMIT_ORDER("submitOrderLock", 3, 30, TimeUnit.SECONDS, "请勿重复点击下单"),
COMMON_LOCK("commonLock:", 3, 120, TimeUnit.SECONDS, "请勿重复点击");
private String keyPrefix;
private int waitTime;
private int leaseTime;
private TimeUnit timeUnit;
private String message;
LockConstant(String keyPrefix, int waitTime, int leaseTime, TimeUnit timeUnit, String message) {
this.keyPrefix = keyPrefix;
this.waitTime = waitTime;
this.leaseTime = leaseTime;
this.timeUnit = timeUnit;
this.message = message;
}
public String getKeyPrefix() {
return keyPrefix;
}
public int getWaitTime() {
return waitTime;
}
public int getLeaseTime() {
return leaseTime;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public String getMessage() {
return message;
}
}
service代码
@Service
public class RedissonServiceImpl implements RedissonService {
@Override
@RlockRepeatSubmit(lockConstant = LockConstant.CASHIER)
public void cashier() {
System.out.println("enter cashier");
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finish cashier");
}
@Override
@RlockRepeatSubmit(lockConstant = LockConstant.SUBMIT_ORDER)
public void submitOrder() {
System.out.println("enter submitOrder");
}
}