基于LUA脚本的Redis分布式锁(SpringBoot实现)
2018-11-16 本文已影响0人
姜蒜儿
1.前言
Redis实现分布式锁,本身比较简单,就是Redis中一个简单的KEY。一般都利用setnx(set if not exists)指令可以非常简单的实现加锁,锁用完后,再调用del指令释放锁。要确保锁可用,一般需要解决几个问题:
- 不能出现死锁情况,一个获得锁的客户端宕机或者异常后,要保障其他客户端也能获得锁。
- 应用程序通过网络与Redis交互,为避免网络延迟以及获取锁线程与其他线程不冲突,需要保障锁操作的原子性,既同一时间只有一个客户端可用获取到锁。
- 加锁和解锁的客户端必须是同一个,不能把其他客户端加的锁给解了。
- 考虑容错性,如果一个客户端加锁成功后,Redis集群Master宕掉并没有及时同步,另外一个客户端加锁会立即成功,避免同一把锁被两个客户端持有。
2.解决思路
- 死锁问题,通常是在拿到锁后给锁设置一个过期时间(expire指令),即使出现异常,在过期时间后,锁也会自动释放
- 原子性问题通常的两个解决方式:
- 通过redis2.8版本后加入的set扩展参数,将加锁和设置过期时间作为一个原子操作,目前发现不是所有Java的Redis客户端都支持这样的set指令
set lock:test true ex 5 nx
- LUA脚本,Redis Lua脚本可以保证多条指令的原子性执行
- 释放其他客户端锁,通过在加锁的时候指定随机值,在解锁的时候用这个随机值去匹配,匹配成功则解锁,匹配失败就不能解锁,因为锁可能已经过期或者已经被其他客户端占用
- Redis集群宕掉的极端情况下,可以考虑redlock算法,但是算法本身复杂,而且带来一些性能损耗,可以根据实际场景判断,是否非常在乎这样的高可用
3.SpringBoot实现
3.1 LUA脚本
本实现基于SpringBoot2x,考虑SpringBoot2x中Redis的默认连接是由lettuce提供,不是常用的Jedis,同时考虑不同版本的Redis,加锁和解锁都采用LUA脚本。
-- 加锁脚本,其中KEYS[]为外部传入参数
-- KEYS[1]表示key
-- KEYS[2]表示value
-- KEYS[3]表示过期时间
if redis.call("setnx", KEYS[1], KEYS[2]) == 1 then
return redis.call("pexpire", KEYS[1], KEYS[3])
else
return 0
-- 解锁脚本
-- KEYS[1]表示key
-- KEYS[2]表示value
-- return -1 表示未能获取到key或者key的值与传入的值不相等
if redis.call("get",KEYS[1]) == KEYS[2] then
return redis.call("del",KEYS[1])
else
return -1
3.2 加锁代码
依赖SpringBoot的RedisTemplate执行LUA脚本,同时考虑重试机制
/**
* 加锁
* @param key Key
* @param timeout 过期时间
* @param retryTimes 重试次数
* @return
*/
public boolean lock(String key, long timeout, int retryTimes) {
try {
final String redisKey = this.getRedisKey(key);
final String requestId = this.getRequestId();
logger.debug("lock :::: redisKey = " + redisKey + " requestid = " + requestId);
//组装lua脚本参数
List<String> keys = Arrays.asList(redisKey, requestId, String.valueOf(timeout));
//执行脚本
Long result = redisTemplate.execute(LOCK_LUA_SCRIPT, keys);
//存储本地变量
if(!StringUtils.isEmpty(result) && result == LOCK_SUCCESS) {
localRequestIds.set(requestId);
localKeys.set(redisKey);
logger.info("success to acquire lock:" + Thread.currentThread().getName() + ", Status code reply:" + result);
return true;
} else if (retryTimes == 0) {
//重试次数为0直接返回失败
return false;
} else {
//重试获取锁
logger.info("retry to acquire lock:" + Thread.currentThread().getName() + ", Status code reply:" + result);
int count = 0;
while(true) {
try {
//休眠一定时间后再获取锁,这里时间可以通过外部设置
Thread.sleep(100);
result = redisTemplate.execute(LOCK_LUA_SCRIPT, keys);
if(!StringUtils.isEmpty(result) && result == LOCK_SUCCESS) {
localRequestIds.set(requestId);
localKeys.set(redisKey);
logger.info("success to acquire lock:" + Thread.currentThread().getName() + ", Status code reply:" + result);
return true;
} else {
count++;
if (retryTimes == count) {
logger.info("fail to acquire lock for " + Thread.currentThread().getName() + ", Status code reply:" + result);
return false;
} else {
logger.warn(count + " times try to acquire lock for " + Thread.currentThread().getName() + ", Status code reply:" + result);
continue;
}
}
} catch (Exception e) {
logger.error("acquire redis occured an exception:" + Thread.currentThread().getName(), e);
break;
}
}
}
} catch (Exception e1) {
logger.error("acquire redis occured an exception:" + Thread.currentThread().getName(), e1);
}
return false;
}
- getRedisKey根据传入的key加上一个前缀生成锁的key
/** * 获取RedisKey * @param key 原始KEY,如果为空,自动生成随机KEY * @return */ private String getRedisKey(String key) { //如果Key为空且线程已经保存,直接用,异常保护 if (StringUtils.isEmpty(key) && !StringUtils.isEmpty(localKeys.get())) { return localKeys.get(); } //如果都是空那就抛出异常 if (StringUtils.isEmpty(key) && StringUtils.isEmpty(localKeys.get())) { throw new RuntimeException("key is null"); } return LOCK_PREFIX + key; }
- getRequestId用于为每一个加锁请求生成请求ID,内部方法
/** * 获取随机请求ID * @return */ private String getRequestId() { return UUID.randomUUID().toString(); }
- redisTemplate.execute(LOCK_LUA_SCRIPT, keys),execute最终调用的RedisConnection的eval方法将LUA脚本交给Redis服务端执行,可兼容springboot中不同redis客户端实现(Jedis、Lettuce等)。这个操作通过setnx设置锁key,成功后设置锁的有效期,成功返回1,失败返回0,其中LOCK_LUA_SCRIPT为常量定义
//定义获取锁的lua脚本 private final static DefaultRedisScript<Long> LOCK_LUA_SCRIPT = new DefaultRedisScript<>( "if redis.call(\"setnx\", KEYS[1], KEYS[2]) == 1 then return redis.call(\"pexpire\", KEYS[1], KEYS[3]) else return 0 end" , Long.class );
- 根据脚本执行情况,将锁的key和requestId分别存入线程本地变量localKeys和localRequestIds中,两个都是ThreadLocal变量,通过两个变量在释放锁的时候避免释放其他客户端占用的锁。
- 根据重试次数retryTimes值进行重试判断,如果为0则不重试,否则进入重试逻辑。
3.3 解锁代码
/**
* 释放KEY
* @param key
* @return
*/
public boolean unlock(String key) {
try {
String localKey = localKeys.get();
//如果本地线程没有KEY,说明还没加锁,不能释放
if(StringUtils.isEmpty(localKey)) {
logger.error("release lock occured an error: lock key not found");
return false;
}
String redisKey = getRedisKey(key);
//判断KEY是否正确,不能释放其他线程的KEY
if(!StringUtils.isEmpty(localKey) && !localKey.equals(redisKey)) {
logger.error("release lock occured an error: illegal key:" + key);
return false;
}
//组装lua脚本参数
List<String> keys = Arrays.asList(redisKey, localRequestIds.get());
logger.debug("unlock :::: redisKey = " + redisKey + " requestid = " + localRequestIds.get());
// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
Long result = redisTemplate.execute(UNLOCK_LUA_SCRIPT, keys);
//如果这里抛异常,后续锁无法释放
if (!StringUtils.isEmpty(result) && result == RELEASE_SUCCESS) {
logger.info("release lock success:" + Thread.currentThread().getName() + ", Status code reply=" + result);
return true;
} else if (!StringUtils.isEmpty(result) && result == LOCK_EXPIRED) {
//返回-1说明获取到的KEY值与requestId不一致或者KEY不存在,可能已经过期或被其他线程加锁
// 一般发生在key的过期时间短于业务处理时间,属于正常可接受情况
logger.warn("release lock exception:" + Thread.currentThread().getName() + ", key has expired or released. Status code reply=" + result);
} else {
//其他情况,一般是删除KEY失败,返回0
logger.error("release lock failed:" + Thread.currentThread().getName() + ", del key failed. Status code reply=" + result);
}
} catch (Exception e) {
logger.error("release lock occured an exception", e);
} finally {
//清除本地变量
this.clean();
}
return false;
}
- 如果本地线程localKeys中无法获取到key,或者获取到的key与传入的不一致,解锁失败
- redisTemplate.execute(UNLOCK_LUA_SCRIPT, keys) 将LUA脚本交给Redis服务端执行。UNLOCK_LUA_SCRIPT常量定义,先判断key值是否与传入的requestId一致,如果一致则删除key,如果不一致返回-1表示key可能已经过期或被其他客户端占用,避免误删。
//定义释放锁的lua脚本 private final static DefaultRedisScript<Long> UNLOCK_LUA_SCRIPT = new DefaultRedisScript<>( "if redis.call(\"get\",KEYS[1]) == KEYS[2] then return redis.call(\"del\",KEYS[1]) else return -1 end" , Long.class );
- 最后通过clean做清理工作
/** * 清除本地线程变量,防止内存泄露 */ private void clean() { localRequestIds.remove(); localKeys.remove(); }
4.后记
- 可将锁改成注解方式,通过AOP降低锁使用的复杂度
- 重试机制可以根据业务情况进行优化
- 可以更进一步借助ThreadLocal保存锁计数器可实现类似ReentrantLock可重入锁机制
- 释放锁失败后可以加入回调方法进行一些业务处理
- 如果业务挂起或者执行时间过长,超过了锁的超时时间,另外的客户端可能提前获取到锁,导致临界区代码不能严格的串行执行。除了合理设置锁超时时间外,尽量不要把分布式锁用于执行时间长的任务
5.补充
5.1 RedisTemplate加载
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.Serializable;
/**
* @Description Redis配置类,替代SpringBoot自动配置的RedisTemplate,参加RedisAutoConfiguration
* 这个类没有设置序列化方式等
* @Author Gazza Jiang
* @Date 2018/11/12 9:30
* @Version 1.0
*/
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedisConfig {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Serializable> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
//Jackson序列化器
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
//字符串序列化器
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//普通Key设置为字符串序列化器
template.setKeySerializer(stringRedisSerializer);
//Hash结构的key设置为字符串序列化器
template.setHashKeySerializer(stringRedisSerializer);
//普通值和hash的值都设置为jackson序列化器
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
5.2 简单测试类
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import xyz.gazza.demo.redis.Application;
import xyz.gazza.demo.redis.lock.RedisLock;
import java.util.ArrayList;
/**
* @Description 测试类
* @Author Gazza Jiang
* @Date 2018/11/12 13:29
* @Version 1.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {Application.class})
public class ApplicationTest {
private final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
@Autowired
RedisLock redisLock;
@Test
public void testRedisLock() throws InterruptedException {
ArrayList<Thread> list = new ArrayList<>();
for(int i =0; i<10; i++) {
//logger.info("线程开始");
Thread t = new Thread() {
@Override
public void run() {
if (redisLock.lock("suaner")) {
try {
//成功获取锁
logger.info("获取锁成功,继续执行任务" + Thread.currentThread().getName());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}catch (Exception e) {
logger.error("excepiotn ", e);
} finally {
redisLock.unlock("suaner");
}
}
}
};
list.add(t);
t.start();
}
for(Thread t : list) {
t.join();
}
Thread.sleep(10000);
}
}
5.3 pom依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>