2022-07-26 springboot redisTempl
2022-07-25 本文已影响0人
江江江123
非必要不用锁。。
只实现了redis单机,基于集群看基于redlock实现
上代码:
1 基于redisTempleate的锁
public interface DistributedLock {
long TIMEOUT_MILLIS = 30000;
int RETRY_TIMES = Integer.MAX_VALUE;
long SLEEP_MILLIS = 500;
boolean lock(String key);
boolean lock(String key, int retryTimes);
boolean lock(String key, int retryTimes, long sleepMillis);
boolean lock(String key, long expire);
boolean lock(String key, long expire, int retryTimes);
boolean lock(String key, long expire, int retryTimes, long sleepMillis);
boolean releaseLock(String key);
}
public abstract class AbstractDistributedLock implements DistributedLock {
@Override
public boolean lock(String key) {
return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes) {
return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes, long sleepMillis) {
return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
}
@Override
public boolean lock(String key, long expire) {
return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, long expire, int retryTimes) {
return lock(key, expire, retryTimes, SLEEP_MILLIS);
}
}
@Slf4j
public class RedisDistributedLock extends AbstractDistributedLock {
private RedisTemplate<Object, Object> redisTemplate;
private ThreadLocal<String> lockFlag = new ThreadLocal<String>();
public static final String UNLOCK_LUA;
static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\"del\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}
private RedisScript<Long> getRedisLockScript() {
return new DefaultRedisScript<>(UNLOCK_LUA, Long.class);
}
public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}
@Override
public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
boolean result = setRedis(key, expire);
// 如果获取锁失败,按照传入的重试次数进行重试
while ((!result) && retryTimes-- > 0) {
try {
log.info("lock failed, retrying..." + retryTimes);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
return false;
}
result = setRedis(key, expire);
}
return result;
}
private boolean setRedis(String key, long expire) {
try {
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
return redisTemplate.opsForValue().setIfAbsent(key, uuid, expire,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("set redis occured an exception", e);
}
return false;
}
@Override
public boolean releaseLock(String key) {
// 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
try {
List<Object> keys = new ArrayList<>();
keys.add(key);
List<String> args = new ArrayList<>();
args.add(lockFlag.get());
// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
Long result = redisTemplate.execute(getRedisLockScript(), keys, args.toArray());
return result != null && result > 0;
} catch (Exception e) {
log.error("release lock occured an exception", e);
}
return false;
}
}
2 基于aop实现的分布式锁
2.1 接口定义
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
//锁定的资源,redis的键
String value() default "";
//锁定保持时间(以毫秒为单位)
long keepMills() default 30000;
//失败时执行的操作
LockFailAction action() default LockFailAction.CONTINUE;
//失败时执行的操作--枚举
enum LockFailAction {
GIVEUP,
CONTINUE;
}
//重试的间隔
long sleepMills() default 200;
//重试次数
int retryTimes() default 5;
//是否越方法
boolean across() default false;
}
2.2 ReidsConfig 使用fastjson序列化,也可以用其他json
@Configuration
@EnableCaching
@AutoConfigureAfter({BroadcastMsgListen.class})
public class RedisConfig {
@Bean
public RedisSerializer fastJson2JsonRedisSerializer() {
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
return new FastJson2JsonRedisSerializer(Object.class);
}
@Bean("redisTemplate")
public RedisTemplate redisTemplate(RedisConnectionFactory factory, RedisSerializer fastJson2JsonRedisSerializer) {
RedisTemplate redisTemplate = new RedisTemplate();
LettuceConnectionFactory lettuceConnectionFactory = getLettuceConnectionFactory((LettuceConnectionFactory) factory);
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(fastJson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(fastJson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
@ConditionalOnBean(RedisTemplate.class)
public DistributedLock distributedLock(RedisTemplate redisTemplate) {
return new RedisDistributedLock(redisTemplate);
}
}
2.3 aop逻辑处理
@Aspect
@Configuration
@AutoConfigureAfter(RedisConfig.class)
@Slf4j
public class DistributedLockAspectConfiguration {
@Value("${spring.application:name}")
String appName;
@Autowired
private DistributedLock distributedLock;
ExpressionParser parser = new SpelExpressionParser();
@Pointcut("@annotation(<pageUrl>RedisLock)")
private void lockPoint() {
}
@Around("lockPoint()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
RedisLock redisLock = method.getAnnotation(RedisLock.class);
String keyExpression = redisLock.value();
String value;
if (StringUtils.isNotEmpty(keyExpression)) {
Expression expression = parser.parseExpression(keyExpression);
value = expression.getValue(pjp.getArgs(), String.class);
} else {
value = "All";
}
if (redisLock.across()) {
key = appName + ":" + value;
} else {
key = appName + ":" + method.getName() + ":" + value;
}
int retryTimes = redisLock.action().equals(RedisLock.LockFailAction.CONTINUE) ? redisLock.retryTimes() : 0;
//获取分布式锁
boolean lock = distributedLock.lock(key, redisLock.keepMills(), retryTimes, redisLock.sleepMills());
if (!lock) {
log.debug("get lock failed : " + key);
return null;
}
//执行方法之后,释放分布式锁
log.debug("get lock success : " + key);
try {
return pjp.proceed(); //执行方法
} catch (Exception e) {
log.debug("execute locked method occured an exception", e);
} finally {
boolean releaseResult = distributedLock.releaseLock(key); //释放分布式锁
log.debug("release lock :" + key + (releaseResult ? " success" : "failed"));
}
return null;
}
}
3 使用案列
3.1 定时推送
@Scheduled(cron = "0 0 9,12,18,21 * * ?")
private void pushViewCount() {
List<NoReadViewCount> noReadViewCounts = viewHistoryService.noReadCount();
}
这段代码单节点没有问题,如果app起N个节点,每天用户就会收到N次推送
@RedisLock(action = RedisLock.LockFailAction.GIVEUP)
List<NoReadViewCount> noReadCount();
添加分布式锁,如果获取锁失败,则直接放弃,这样N个节点只会有一个推送
3.2 细粒度锁
加入房间上锁,如果不控制粒度,整个服务N个房间同一时间都只能加入一个用户,控制粒度后,根据房间id上锁,这样N个房间里同一时间都可以进入一个人
@RedisLock("[1].id")
public RoomUserVo joinRoom(long uid, Room room) {
// todo
}
3.3 跨方法锁
2个方法同一时刻只有一个会执行
@RedisLock(value = "[0].deviceId", across = true)
public Result<UserVo> save(SaveUserDto saveUserDto) {
// todo
}
@RedisLock(value = "[0].deviceId", across = true)
public Result updatePushToken(PushTokenDto pushTokenDto) {
// todo
}