高级专题

分布式锁续约问题-续

2020-12-25  本文已影响0人  Wind哥

萌芽

对于分布式锁,有一个不错的客户端Redisson,是会自动锁续约的,详情请自行搜索
但,对于这个客户端的使用方式,个人不是很喜欢,还是更倾向Lettuce

对于锁续约原理 不复杂(参考Redisson),无非就是用个WatchDog线程,时不时对比一下锁的超时时间还剩余多少,如果小于某个值就续约(预设时间30s,已经过去20s了,就刷新超时时间)

分析源码

这里使用的是sping integration包里的分布式锁(理解成官方),详情请查阅
我们的目标是获取存在的分布式锁,通过这些锁的key进行续约
代码片段如下(精简了部分内容)

public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
    private static final String OBTAIN_LOCK_SCRIPT =
            "local lockClientId = redis.call('GET', KEYS[1])\n" +
                    "if lockClientId == ARGV[1] then\n" +
                    "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                    "  return true\n" +
                    "elseif not lockClientId then\n" +
                    "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
                    "  return true\n" +
                    "end\n" +
                    "return false";
    private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();

这里我截取了两个关键部分

那么目标就很明确了

第一个小目标

1.如果当前发指令的clientId 就是锁的持有者,则续约
2.如果当前clientId不存在,则锁上
3.否则锁失败

这也是为啥在分布式锁续约问题,我可以使用重入锁的的方式续约的原因
那么问题来了,为啥这里不能直接重入锁去刷新呢?
因为重入锁是需要业务代码上调用的,这样对业务代码侵入性太强了,而WatchDog调用,本身在不同线程上,就连本地锁都没办法重入(为了性能,分布式锁也维持了一把本地锁)

清楚指令意思之后就好办了,编写续约代码

private static final String OBTAIN_LOCK_SCRIPT_VERSION = "aa1fc9ae99657e86372b45452e5d6f71";

private static final String RENEW_LOCK_SCRIPT =
            "local lockClientId = redis.call('GET', KEYS[1])\n" +
                    "if lockClientId == ARGV[1] then\n" +
                    "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                    "  return true\n" +
                    "end\n" +
                    "return false";

代码意思也是显然而已的
1.锁是自己的,续约
2.否则,失败(业务代码完成了,主动释放锁了)

当然,为了保险起见,这边把申请锁的代码md5了,作为分布式锁的版本,用来判断当前的插件代码是否还合适

完整代码

上面的是废话,看代码就好了

/**
 * (why) 提供【自动续约】功能
 * (what)本类以【暴力】锁进行续约
 * (how)自动初始化,并以插件方式运行
 *
 * @Todo 若 RedisLockRegistry 提供续约功能,应使用官方功能
 * @author Wind
 */
@Slf4j
public class LockWatchdog {

    /**
     * 这个是RedisLockRegistry的script, 用于确认版本是否正确
     */
    private static final String OBTAIN_LOCK_SCRIPT_VERSION = "aa1fc9ae99657e86372b45452e5d6f71";

    /**
     * renew锁使用
     * 和 OBTAIN_LOCK_SCRIPT 最大的区别就是如果lockClientId不存在,不会创建一条
     */
    private static final String RENEW_LOCK_SCRIPT =
            "local lockClientId = redis.call('GET', KEYS[1])\n" +
                    "if lockClientId == ARGV[1] then\n" +
                    "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                    "  return true\n" +
                    "end\n" +
                    "return false";

    /**
     * RedisLockRegistry(static).FIELDS
     */
    private static final String FIELD_OBTAIN_LOCK_SCRIPT = "OBTAIN_LOCK_SCRIPT";

    /**
     * redisLockRegistry(object).FIELDS
     */
    private static final String FIELD_CLIENT_ID = "clientId";
    private static final String FIELD_EXPIRE_AFTER = "expireAfter";
    private static final String FIELD_LOCKS = "locks";

    /**
     * redisLock(object).FIELDS
     */
    private static final String FIELD_LOCK_KEY = "lockKey";
    private static final String FIELD_LOCKED_AT = "lockedAt";

    private final RedisLockRegistry lockRegistry;
    private final StringRedisTemplate redisTemplate;
    private final RedisScript<Boolean> renewLockScript;
    private final String clientId;
    private final long expireAfter;

    public LockWatchdog(RedisLockRegistry lockRegistry, StringRedisTemplate redisTemplate) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
        //依赖
        this.lockRegistry = lockRegistry;
        this.redisTemplate = redisTemplate;

        //刷新脚本
        this.renewLockScript = new DefaultRedisScript<>(RENEW_LOCK_SCRIPT, Boolean.class);

        //代理类的参数
        this.clientId = (String) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_CLIENT_ID);
        Assert.hasText(this.clientId, "client id is required!");
        this.expireAfter = (Long) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_EXPIRE_AFTER);
        Assert.notNull(this.expireAfter, "expire after is required!");
        Assert.isTrue(this.expireAfter > 0, "expire after <= 0");

        //check version
        String script = (String) UnsafeBeanUtils.getProperty(RedisLockRegistry.class, FIELD_OBTAIN_LOCK_SCRIPT);
        Assert.isTrue(CipherUtils.md5(script).equalsIgnoreCase(OBTAIN_LOCK_SCRIPT_VERSION),"verion error");
        log.info("init success clientId {}, expireAfter {}", clientId, expireAfter);
    }

    /**
     * 续约锁
     * 续约成功后会修改lockedAt字段,避免锁被超时回收了
     *
     * @param redisLock
     * @return
     * @throws IllegalAccessException
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     */
    private boolean renewLock(Object redisLock) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
        // lock key 和 map key不同的,需要重新获取
        String lockKey = (String) UnsafeBeanUtils.getProperty(redisLock, FIELD_LOCK_KEY);
        if(log.isDebugEnabled()) {
            log.debug("LockWatchdog:try to renew {}", lockKey);
        }
        Boolean success =
                redisTemplate.execute(renewLockScript,
                        Collections.singletonList(lockKey), clientId,
                        String.valueOf(expireAfter));

        boolean result = Boolean.TRUE.equals(success);
        if (result) {
            UnsafeBeanUtils.setProperty(redisLock, FIELD_LOCKED_AT, System.currentTimeMillis());
            if(log.isDebugEnabled()) {
                log.debug("LockWatchdog:{} renew success!", lockKey);
            }
        } else {
            if(log.isDebugEnabled()) {
                log.debug("LockWatchdog:{} renew fail!", lockKey);
            }
        }

        return result;
    }

    /**
     * 定时器(10s执行一次续约)
     * 这里直接获取到Map里的内容尝试续约
     * Map里的锁是会自动删除的(ExpirableLockRegistry)
     * 而且使用分布式锁的场景不算多,所以已经解锁的,也去尝试续约也是没问题的
     *
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws IllegalAccessException
     */
    @Scheduled(cron = "*/10 * * * * ?")
    private void scheduled() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        if(log.isDebugEnabled()){
            log.debug("LockWatchdog:renew lock");
        }
        Map<String, Lock> locks = (Map<String, Lock>) UnsafeBeanUtils.getProperty(lockRegistry, FIELD_LOCKS);
        if(log.isDebugEnabled()) {
            log.debug("LockWatchdog:locks {}", locks.size());
        }
        if (MapUtils.isNotEmpty(locks)) {
            Iterator<Map.Entry<String, Lock>> iter = locks.entrySet().iterator();
            Map.Entry<String, Lock> entry = null;
            while (iter.hasNext()) {
                entry = iter.next();
                renewLock(entry.getValue());
            }
        } else {
            if(log.isDebugEnabled()) {
                log.debug("LockWatchdog:not need to renew!");
            }
        }
        if(log.isDebugEnabled()) {
            log.debug("LockWatchdog:renew lock finish!");
        }
    }

    /**
     * 不安全的类操作
     */
    private final static class UnsafeBeanUtils {
        public static Object getProperty(final Class clazz, final String name) throws IllegalAccessException {
            Field field = FieldUtils.getDeclaredField(clazz, name, true);
            return field.get(clazz);
        }

        public static Object getProperty(final Object bean, final String name) throws IllegalAccessException {
            Field field = FieldUtils.getDeclaredField(bean.getClass(), name, true);
            return field.get(bean);
        }

        public static void setProperty(final Object bean, final String name, final Object value) throws IllegalAccessException {
            Field field = FieldUtils.getDeclaredField(bean.getClass(), name, true);
            field.set(bean, value);
        }

    }
}

代码说明

上一篇下一篇

猜你喜欢

热点阅读