分布式锁的实现

2018-12-30  本文已影响0人  firefly_

引言,来自阿里巴巴Java开发手册上的一句话:“【强制】并发修改同一记录时,避免更新丢失,需要加锁。要么在应用层加锁,要么在缓存加锁,要么在数据库层使用乐观锁,使用 version 作为更新依据。说明:如果每次访问冲突概率小于 20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不得小于 3 次。”

为了保证一个方法或属性在高并发情况下的同一时间只能被一个线程访问,在单机部署的情况下,可以使用ReentrantLock或Synchronized进行互斥控制。随着发展,单机部署的系统已经不能满足业务的需要,越来越多的系统进化成分布式集群系统,原本在单机运行的锁控制已经不能实现“一个方法或属性在高并发情况下的同一时间只能被一个线程访问”的控制,要在分布式系统中实现在单机中的控制效果就必须使用分布式锁。

分布式锁应该具备的特性:

1、在分布式系统环境下,同一个方法在同一时间只能被一个机器的同一个线程执行;
2、保证获取锁与释放锁的性能和可用性;
3、具备可重入特性;
4、具备锁失效机制,防止死锁;
5、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

分布式锁的实现,业界主流认为有三种实现方式:

1、基于数据库实现的分布式锁(轻量级,实现简单);
2、基于缓存的分布式锁(轻量级,实现简单);
3、基于Zookeeper的分布式锁(重量级,实现较繁琐,性能还一般?)

本文只对前两种实现方式展开讨论。
下面的代码中很多使用了lombok(强烈推荐使用)的注解,如果要使用,请安装lombok插件,然后在项目中添加lombok依赖:

<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

一、基于数据库实现的分布式锁(乐观锁(+悲观锁?))
首先是扫盲,简单介绍一下乐观锁和悲观锁:
乐观锁:每次去取数据的时候总认为不会有其他线程对数据进行修改,因此不会上锁,但是在更新时会判断其他线程在这之前有没有对数据进行修改,一般会使用版本号机制或CAS操作实现;
悲观锁:每次取数据时都认为其他线程会修改,所以都会加锁(读锁、写锁、行锁等),当其他线程想要访问数据时,都需要阻塞挂起。可以依靠数据库实现,如行锁、读锁和写锁等,都是在操作之前加锁,在 Java中,synchronized的思想也是悲观锁。
乐观锁的实现较简单的实现是通过在表中增加一个version版本号字段或者更新时间updatetm字段(Timestamp时间戳类型,勾选“随当前时间戳更新”)(建议使用updatetm字段,考虑在建表时就添加一个createtm字段(不用更新)和updatetm字段),当线程A要更新数据值时,首先读取数据的同时也会读取version值(或者updatetm值,下同),在提交更新时,判断刚才读取到的version值与当前数据库中的version值是否相等,相等时才更新,否则重试更新操作(使用递归的方式比较简单),直到更新成功。

import lombok.Synchronized;

@Service
@Slf4j
public class DemoServiceSql{

    @Transactional(rollbackFor = Exception.class)
    // 这里使用Synchronized注解是保证在单台机器上同一时间只有一个线程能够访问(是否有必要?)
    @Synchronized
    public T methodSql(Integer id){
        // Object obj= select version from tb_xx where  id=#{id} ;
        if(obj != null && obj.getVersion() != null){
            dataHandle(id, obj.getVersion());        
         }
    }

     public T dataHandle(Integer id, Integer version){
        // where 条件里还可以根据业务需要添加其他限制条件,保证更精准的控制
        int count  = update tb_xx set x=x+1, version=version+1 where id=#{id} and version=#{version};
        // int count = update tb_xx set x=x+1 where id=#{id} and updatetm=#{updatetm};  
        if(count  < 1){
            // 更新失败,重试
            dataHandle(id, version);
        }
        return T:
    }
}

二、基于Redis的分布式锁

@Service
@Slf4j
public class DemoServiceCache{

    @Autowired
    RedisTemplate<String, String> redisTemplate;

    @Transactional(rollbackFor = Exception.class)
    public T methodCache(){
        RedisLock redisLock =new RedisLock(redisTemplate, key, 60000, 3000, 2);
        // 请求锁
        boolean locked = redisLock.getLock();
        if (!locked) {
           throw new Exception("获取redis锁失败!");
        }
        try {
            // doSomething
            // insertInto tb_xx
            // update tb_xx
        } catch (Exception e) {
            log.error("methodCache...error:{}", e);
            // 异常被捕获,如果需要回滚事物,一定要抛出异常
            throw e;
        } finally {
            // 释放锁
            redisLock.release();
        }
    }
}

工具类RedisLock的实现

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.TimeUnit;

@Data
@Slf4j
public class RedisLock{

    private RedisTemplate<String, String> redisTemplate;

    /**
     * redis.key
    */
    private String key;

    /**
    * redis.value
    */
    private String value;

    /**
    * redis过期时间
    */
    private int expire = 60000;

    /**
    * 等待时间
    */
    private int waitMillis = 500;

    /**
    * 重试次数
    */
    private int tryCount = 3;

    private TimeUnit timeUnit = TimeUnit.MILLISECONDS;

    public RedisLock(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public RedisLock(RedisTemplate<String, String> redisTemplate, String key, int expire) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.expire = expire;
    }

    public RedisLock(RedisTemplate<String, String> redisTemplate, String key, int expire, int waitMillis, int tryCount) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.expire = expire;
        this.waitMillis = waitMillis;
        this.tryCount = tryCount;
    }

    public RedisLock(RedisTemplate<String, String> redisTemplate, String key, 
                      int expire, TimeUnit timeUnit, int waitMillis, int tryCount) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.expire = expire;
        this.waitMillis = waitMillis;
        this.tryCount = tryCount;
        this.timeUnit = timeUnit;
    }

    public boolean getLock() {
        try {
            return getLock(tryCount);
        } catch (Exception e) {
            log.info(e.getMessage(), e);
        }
        return false;
    }

    /**
     * 获取锁
     *
     * @param remainTryCount 重试次数
     * @return true if success
     * @throws Exception
     */
    private boolean getLock(int remainTryCount) throws Exception {
        this.value = System.currentTimeMillis() + "";
        // 如果成功 设置这个key的过期时间
        // setIfAbsent(K var1, V var2, long var3, TimeUnit var5)方法在spring-data-redis 2.x中支持
        boolean success = redisTemplate.opsForValue().setIfAbsent(key, value, expire, timeUnit);
        if (success) {
            return true;
        } else {
            // 失败  获取值 判断key是否超时未移除 
            String value = redisTemplate.opsForValue().get(key);
            if (StringUtils.isNotEmpty(value)) {
                if (System.currentTimeMillis() - Long.parseLong(value) > timeUnit.toMillis(expire)) {
                    // 超时移除
                    redisTemplate.delete(key);
                }
            }
            // 重试、等待
            if (remainTryCount > 0 && waitMillis > 0) {
                Thread.sleep(waitMillis);
                return getLock(remainTryCount - 1);
            } else {
                return false;
            }
        }
    }

    /**
     * 获取等待时间
     *
     * @return 等待时间
     */
    public long getWaitSecond() {
        long currentTime = System.currentTimeMillis();
        long preTime = Long.parseLong(redisTemplate.opsForValue().get(key));
        return (preTime + timeUnit.toMillis(expire) - currentTime) / 1000;
    }

    /**
     * 释放锁
     */
    public void release() {
        if (value == null || key == null) {
            return;
        }
        if (value.equals(redisTemplate.opsForValue().get(key))) {
            redisTemplate.delete(key);
        }
    }
}

如果使用setIfAbsent(key, value)方法,需要考虑到一种极端情况可能会导致死锁发成:boolean success = redisTemplate.opsForValue().setIfAbsent(key, value);执行返回成功,但是还没来得及设置过期时间,这个时候Redis挂掉了,那么其他请求肯定是拿不到锁的(这里也另外存在一个问题:如果过期时间设置不合理,导致任务还没执行完成,锁就被其他请求抢走了。这个时候可以考虑使用功能更强大的Redisson来实现分布式锁~~)。

@Transactional(rollbackFor = Exception.class)
@DistributedLock(prefix = "testLock")
public String reTrade(@LockKey String orderCode) {
  try {
    // doSomething
    // insertInto tb_xx
    // update tb_xx
  } catch (Exception e) {
      log.error("methodCache...error:{}", e);
      // 异常被捕获,如果需要回滚事物,一定要抛出异常
      throw e;
  }
}
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface DistributedLock {
    String prefix() default "";

    int expireTime() default 60;

    int tryCount() default 3;

    int waitMillis() default 1;

    TimeUnit timeUnit() default TimeUnit.SECONDS;
}
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Documented
public @interface LockKey {
}
@Aspect
@AllArgsConstructor
@Component
public class DistributedLockAspectAdvice {
    public static final String PREFIX = "com:goon:middle:locked:";

    final RedisTemplate<String, String> redisTemplate;

    @Pointcut("@annotation(com.goon.common.lock.DistributedLock)")
    public void lockAspect() {
    }

    @Around("lockAspect()")
    public Object lockAroundAction(ProceedingJoinPoint joinPoint) throws Throwable {
        Class<?> type = joinPoint.getSignature().getDeclaringType();
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        DistributedLock dl = method.getAnnotation(DistributedLock.class);

        String prefix = PREFIX + StringUtils.defaultIfBlank(dl.prefix(), type.getName() + ":"
                               + method.getName()) + getKey(method, joinPoint.getArgs());
        RedisLock redisLock = new RedisLock(redisTemplate, prefix, dl.expireTime(), dl.timeUnit(), dl.waitMillis(), dl.tryCount());
        // 获取锁
        boolean lock = redisLock.getLock();
        if (!lock) {
            throw new RuntimeException("get lock fail, please wait " + redisLock.getWaitSecond() + "seconds..");
        }
        Object result;
        try {
            result = joinPoint.proceed();
        } finally {
            redisLock.release();
        }
        return result;
    }

    private String getKey(Method method, Object[] args) {
        Annotation[][] annotations = method.getParameterAnnotations();
        StringBuilder key = new StringBuilder();
        for (int i = 0; i < annotations.length; i++) {
            if (Stream.of(annotations[i]).anyMatch(o -> o.annotationType().isAssignableFrom(LockKey.class))) {
                key.append(args[i]);
            }
        }
        return key.toString();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读