redis分布式锁
2020-02-28 本文已影响0人
寒雨然
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.async.RedisScriptingAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@Component
public class RedisLockServiceImpl implements RedisLockService {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockService.class);
private static final String UNLOCK_LUA;
private static final String RESULT_STRING = "OK";
private static final Integer LOCK_EXPIRE = 300;
// LUA脚本
static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\"del\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}
ThreadLocal<String> lockValue = new ThreadLocal<>();
@Resource
private RedisTemplate<String, String> redisTemplate;
public RedisLockServiceImpl(RedisTemplate<String, String> redisTemplate) {
Assert.notNull(redisTemplate, "redisTemplate should not be null.");
this.redisTemplate = redisTemplate;
}
/**
* 加锁
*
* @param key
* @return
*/
@Override
public boolean tryLock(String key) {
return setRedisLock(key, LOCK_EXPIRE);
}
/**
* @param key
* @param timeOut 单位:秒
* @return
*/
@Override
public boolean tryLock(String key, Integer timeOut) {
return setRedisLock(key, timeOut);
}
/**
* 释放锁
* 有可能因为持锁之后方法执行时间大于锁的有效期,此时已经被另外一个线程持有锁,所以不能直接删除
* 使用lua脚本删除redis中匹配localValue的key
*
* @param key
* @return false:锁过期或已不属于当前线程
*/
@Override
public boolean releaseLock(String key) {
// 获取当前线程的uuid
String value = this.lockValue.get();
if (value == null) {
return false;
}
// 回收ThreadLocal
lockValue.remove();
// 执行解锁命令
Long result = redisTemplate.execute((RedisCallback<Long>) (connection) -> releaseLockCall(connection, key, value));
// 锁过期返回0,删除key成功返回1
return result != null && result > 0;
}
/**
* 基于redis加锁
*
* @param key
* @return
*/
private boolean setRedisLock(String key, Integer expire) {
String uuid = UUID.randomUUID().toString();
// 执行加锁命令
String result = redisTemplate.execute((RedisCallback<String>) (connection) -> lockCallBack(connection, key, uuid, expire));
if (!RESULT_STRING.equals(result)) {
return false;
}
// 保存当前线程的uuid 解锁时根据此uuid和redis中存的值来确定锁是否为本线程所有
lockValue.set(uuid);
return true;
}
/**
* redis加锁回调逻辑(区分集群和单机)
*
* @param connection REDIS连接
* @param key key
* @param uuid value
* @return 加锁的回调方法
*/
private String lockCallBack(RedisConnection connection, String key, String uuid, Integer expire) {
// 获取redis连接模式
Object nativeConnection = connection.getNativeConnection();
byte[] keyByte = key.getBytes(StandardCharsets.UTF_8);
byte[] valueByte = uuid.getBytes(StandardCharsets.UTF_8);
String resultString = "";
// 集群模式
if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
// NX为原子性操作 设置过期时间为5分钟,防止宕机无法释放锁
resultString = commands.getStatefulConnection().sync().set(keyByte, valueByte, SetArgs.Builder.nx().ex(expire));
}
// 单机模式
else if (nativeConnection instanceof RedisAsyncCommands) {
RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
resultString = commands.getStatefulConnection().sync().set(keyByte, valueByte, SetArgs.Builder.nx().ex(expire));
}
return resultString;
}
/**
* 释放锁回调逻辑(集群模式与单机模式)
*
* @param connection
* @param key
* @param lockValue 当前线程uuid
* @return 解锁回调方法
*/
private Long releaseLockCall(RedisConnection connection, String key, String lockValue) {
Long result = 0L;
try {
Object nativeConnection = connection.getNativeConnection();
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = lockValue.getBytes(StandardCharsets.UTF_8);
Object[] keyParam = new Object[]{keyBytes};
// 集群模式
if (nativeConnection instanceof RedisScriptingAsyncCommands) {
RedisScriptingAsyncCommands<Object, byte[]> command = (RedisScriptingAsyncCommands<Object, byte[]>) nativeConnection;
// LUA脚本具有原子性
result = (Long) command.eval(UNLOCK_LUA, ScriptOutputType.INTEGER, keyParam, valueBytes).get();
}
return result;
} catch (InterruptedException | ExecutionException e) {
LOGGER.info("[RedisLockService][releaseLockCall] eval command fail", e);
return result;
}
}
}