Redis首页投稿(暂停使用,暂停投稿)程序员

Spring-data-redis + redis 分布式锁(一

2017-11-15  本文已影响1149人  xiaolyuh

分布式锁的解决方式

  1. 基于数据库表做乐观锁,用于分布式锁。(适用于小并发)
  2. 使用memcached的add()方法,用于分布式锁。
  3. 使用memcached的cas()方法,用于分布式锁。(不常用)
  4. 使用redis的setnx()、expire()方法,用于分布式锁。
  5. 使用redis的setnx()、get()、getset()方法,用于分布式锁。
  6. 使用redis的watch、multi、exec命令,用于分布式锁。(不常用)
  7. 使用zookeeper,用于分布式锁。(不常用)

这里主要介绍第四种和第五种:

使用redis的setnx()、expire()方法,用于分布式锁

原理

对于使用redis的setnx()、expire()来实现分布式锁,这个方案相对于memcached()的add()方案,redis占优势的是,其支持的数据类型更多,而memcached只支持String一种数据类型。除此之外,无论是从性能上来说,还是操作方便性来说,其实都没有太多的差异,完全看你的选择,比如公司中用哪个比较多,你就可以用哪个。

首先说明一下setnx()命令,setnx的含义就是SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果key不存在,则设置当前key成功,返回1;如果当前key已经存在,则设置当前key失败,返回0。但是要注意的是setnx命令不能设置key的超时时间,只能通过expire()来对key设置。

具体的使用步骤如下:

  1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
  2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
  3. 执行完业务代码后,可以通过delete命令删除key。

为了保证在某个Redis节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(timeOut),它要远小于锁的有效时间(几十毫秒量级)。

可能存在的问题

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用redis的setnx()、get()和getset()方法来实现分布式锁。

具体实现

锁具体实现RedisLock:

package com.xiaolyuh.lock;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;

public class RedisLock {
    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

    //////////////////// 静态常量定义开始///////////////////////
    /**
     * 存储到redis中的锁标志
     */
    private static final String LOCKED = "LOCKED";

    /**
     * 默认请求锁的超时时间(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默认锁的有效时间(s)
     */
    public static final int EXPIRE = 60;
    //////////////////// 静态常量定义结束///////////////////////

    /**
     * 锁标志对应的key
     */
    private String key;

    /**
     * 锁的有效时间(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 请求锁的超时时间(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 锁flag
     */
    private volatile boolean isLocked = false;
    /**
     * Redis管理模板
     */
    private StringRedisTemplate redisTemplate;

    /**
     * 构造方法
     *
     * @param redisTemplate Redis管理模板
     * @param key           锁定key
     * @param expireTime    锁过期时间 (秒)
     * @param timeOut       请求锁超时时间 (毫秒)
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime, long timeOut) {
        this.key = key;
        this.expireTime = expireTime;
        this.timeOut = timeOut;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 构造方法
     *
     * @param redisTemplate Redis管理模板
     * @param key           锁定key
     * @param expireTime    锁过期时间
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime) {
        this.key = key;
        this.expireTime = expireTime;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 构造方法(默认请求锁超时时间30秒,锁过期时间60秒)
     *
     * @param redisTemplate Redis管理模板
     * @param key           锁定key
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key) {
        this.key = key;
        this.redisTemplate = redisTemplate;
    }

    public boolean lock() {
        // 系统当前时间,纳秒
        long nowTime = System.nanoTime();
        // 请求锁超时时间,纳秒
        long timeout = timeOut * 1000000;
        final Random random = new Random();

        // 不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁
        // 这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间
        // 如果一个master节点不可用了,应该尽快尝试下一个master节点
        while ((System.nanoTime() - nowTime) < timeout) {
            // 将锁作为key存储到redis缓存中,存储成功则获得锁
            if (redisTemplate.opsForValue().setIfAbsent(key, LOCKED)) {
                isLocked = true;
                // 设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
                // 可以防止因异常情况无法释放锁而造成死锁情况的发生
                redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);

                // 上锁成功结束请求
                break;
            }
            // 获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现
            // 睡眠10毫秒后继续请求锁
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("获取分布式锁休眠被中断:", e);
            }
        }
        return isLocked;

    }

    public boolean isLock() {
        redisTemplate.getConnectionFactory().getConnection().time();
        return redisTemplate.hasKey(key);
    }

    public void unlock() {
        // 释放锁
        // 不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作
        if (isLocked) {
            redisTemplate.delete(key);
        }
    }

}

调用锁:

public void redisLock(int i) {
        RedisLock redisLock = new RedisLock(redisTemplate, "redisLockKey:"+i % 10, 5*60 , 500);
        try {
            long now = System.currentTimeMillis();
            if (redisLock.lock()) {
                logger.info("=" + (System.currentTimeMillis() - now));
                // TODO 获取到锁要执行的代码块
                logger.info("j:" + j ++);
            } else {
                logger.info("k:" + k ++);
            }
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        } finally {
            // 一定要释放锁
            redisLock.unlock();
        }
    }

使用redis的setnx()、get()、getset()方法,用于分布式锁

原理

这个方案的背景主要是在setnx()和expire()的方案上针对可能存在的死锁问题,做了一版优化。

那么先说明一下这三个命令,对于setnx()和get()这两个命令,相信不用再多说什么。那么getset()命令?这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对key设置newValue这个值,并且返回key原来的旧值。假设key原来是不存在的,那么多次执行这个命令,会出现下边的效果:

  1. getset(key, "value1") 返回nil 此时key的值会被设置为value1
  2. getset(key, "value2") 返回value1 此时key的值会被设置为value2
  3. 依次类推!

介绍完要使用的命令后,具体的使用步骤如下:

  1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。

  2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。

  3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。

  4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。

  5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

可能存在的问题

问题1: 在“get(lockkey)获取值oldExpireTime ”这个操作与“getset(lockkey, newExpireTime) ”这个操作之间,如果有N个线程在get操作获取到相同的oldExpireTime后,然后都去getset,会不会返回的newExpireTime都是一样的,都会是成功,进而都获取到锁???

我认为这套方案是不存在这个问题的。依据有两条: 第一,redis是单进程单线程模式,串行执行命令。 第二,在串行执行的前提条件下,getset之后会比较返回的currentExpireTime与oldExpireTime 是否相等。

问题2: 在“get(lockkey)获取值oldExpireTime ”这个操作与“getset(lockkey, newExpireTime) ”这个操作之间,如果有N个线程在get操作获取到相同的oldExpireTime后,然后都去getset,假设第1个线程获取锁成功,其他锁获取失败,但是获取锁失败的线程它发起的getset命令确实执行了,这样会不会造成第一个获取锁的线程设置的锁超时时间一直在延长???

我认为这套方案确实存在这个问题的可能。但我个人认为这个微笑的误差是可以忽略的,不过技术方案上存在缺陷,大家可以自行抉择哈。

问题3: 这个方案必须要保证分布式服务器的时间一定要同步,否则这个锁就会出问题。

具体实现

锁具体实现RedisLock:

package com.xiaolyuh.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Redis分布式锁(这种方式服务器时间一定要同步,否则会出问题)
 * 
 * @author yuhao.wangwang
 * @version 1.0
 * @date 2017年11月3日 上午10:21:27
 */
public class RedisLock2 {

    /**
     * 默认请求锁的超时时间(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默认锁的有效时间(s)
     */
    public static final int EXPIRE = 60;

    private static Logger logger = LoggerFactory.getLogger(RedisLock2.class);

    private StringRedisTemplate redisTemplate;

    /**
     * 锁标志对应的key
     */
    private String lockKey;
    /**
     * 锁的有效时间(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 请求锁的超时时间(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 锁的有效时间
     */
    private long expires = 0;

    /**
     * 锁标记
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用默认的锁过期时间和请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用默认的请求锁的超时时间,指定锁的过期时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param expireTime    锁的过期时间(单位:秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用默认的锁的过期时间,指定请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 锁的过期时间和请求锁的超时时间都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param expireTime    锁的过期时间(单位:秒)
     * @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * @return 获取锁的key
     */
    public String getLockKey() {
        return lockKey;
    }

    /**
     * 获得 lock.
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public boolean lock() {
        // 请求锁超时时间,纳秒
        long timeout = timeOut * 1000000;
        // 系统当前时间,纳秒
        long nowTime = System.nanoTime();

        while ((System.nanoTime() - nowTime) < timeout) {
            // 分布式服务器有时差,这里给1秒的误差值
            expires = System.currentTimeMillis() + expireTime + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间

            if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
                locked = true;
                // 设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
                // 可以防止因异常情况无法释放锁而造成死锁情况的发生
                redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);

                // 上锁成功结束请求
                return true;
            }

            String currentValueStr = redisTemplate.opsForValue().get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置现在的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受

                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }

            /*
                延迟10 毫秒,  这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                使用随机的等待时间可以一定程度上保证公平性
             */
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("获取分布式锁休眠被中断:", e);
            }

        }
        return locked;
    }


    /**
     * 解锁
     */
    public synchronized void unlock() {
        // 只有加锁成功并且锁还有效才去释放锁
        if (locked && expires > System.currentTimeMillis()) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

}

调用方式:

public void redisLock2(int i) {
    RedisLock2 redisLock2 = new RedisLock2(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
    try {
        long now = System.currentTimeMillis();
        if (redisLock2.lock()) {
            logger.info("=" + (System.currentTimeMillis() - now));
            // TODO 获取到锁要执行的代码块
            logger.info("j:" + j++);
        } else {
            logger.info("k:" + k++);
        }
    } catch (Exception e) {
        logger.info(e.getMessage(), e);
    } finally {
        redisLock2.unlock();
    }
}

对于上面两种redis实现分布式锁的方案都有一个问题:

使用redis的SET resource-name anystring NX EX max-lock-time方式来实现分布式锁

下一篇文章介绍
Spring-data-redis + redis 分布式锁(二)

源码: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-data-redis-distributed-lock 工程

参考:

上一篇下一篇

猜你喜欢

热点阅读