分布式锁

2017-09-20  本文已影响75人  米刀灵

有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。单机的可以使用ReentrantLock或者synchronized代码块来实现,但是这些API在分布式场景中就无能为力了。
针对分布式锁的实现,目前比较常用的有以下几种方案:基于数据库, 基于缓存(redis,memcached),基于Zookeeper。

应用的场景例子:
管理后台的部署架构(多台tomcat服务器+redis【多台tomcat服务器访问一台redis】+mysql【多台tomcat服务器访问一台服务器上的mysql】)就满足使用分布式锁的条件。多台服务器要访问redis全局缓存的资源,如果不使用分布式锁就会出现问题。 看如下伪代码:

    long N=0L;
    //N从redis获取值
    if(N<5){
    N++;
    //N写回redis
    }

从redis获取值N,对数值N进行边界检查,自加1,然后N写回redis中。 这种应用场景很常见,像秒杀,全局递增ID、IP访问限制等。

基于redis:

使用 SETNX key value 命令。
设置成功,返回1,加锁。该客户端最后可以通过DEL key来释放该锁。
设置失败,返回0,获取锁失败。这时我们可以先返回或进行重试等对方完成或等待锁超时。

存在的问题:

解决方式:

仍然存在的问题:

具体实现:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

public class RedisLock {

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
    private RedisTemplate redisTemplate;
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;

    private String lockKey;

    //锁超时时间,防止线程在入锁以后,无限的执行等待    
    private int expireMsecs = 60 * 1000;
    //获取锁失败后,锁等待时间。防止无限期等待
    private int timeoutMsecs = 10 * 1000;
    //是否加锁标志位
    private volatile boolean locked = false;

    //重载构造方法
    public RedisLock(RedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) {
        this(redisTemplate, lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) {
        this(redisTemplate, lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }


    public String getLockKey() {
        return lockKey;
    }

    //redis在存储数据时,都把数据转化成了byte[]数组的形式,那么在存取数据时,需要用到Serializer将数据格式进行转化。
    //对redis进行get操作
    private String get(final String key) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] data = connection.get(serializer.serialize(key));
                    connection.close();
                    if (data == null) {
                        return null;
                    }
                    return serializer.deserialize(data);
                }
            });
        } catch (Exception e) {
            logger.error("get redis error, key : {}", key);
        }
        return obj != null ? obj.toString() : null;
    }

    //对redis进行setNX操作
    private boolean setNX(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return success;
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (Boolean) obj : false;
    }

    //对redis进行getset操作
    private String getSet(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return serializer.deserialize(ret);
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (String) obj : null;
    }

    /**
     * 使用setnx命令,缓存了锁。, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁。
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值,成功获得锁。
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            
            //加锁。如果不存在加锁成功,返回true。
            if (this.setNX(lockKey, expiresStr)) {
                //得到锁
                locked = true;
                return true;
            }

            //lockKey已存在,获取lockKey里的时间。
            String currentValueStr = this.get(lockKey);
            //判断是否为空,是否过期。
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //过期,设置现在的锁到期时间。只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的。
                String oldValueStr = this.getSet(lockKey, expiresStr);

                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁

                    //得到锁
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
        }
        //已存在lockKey,且未过期,则获取锁失败
        return false;
    }


    //解锁。直接删除lockKey即可
    public synchronized void unlock() {
        if (locked) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

    public static void main(String[] args) throws Exception {
        RedisLock lock = new RedisLock(redisTemplate, key, 10000, 20000);
        try{
            if(lock.lock()) {
                //需要加锁的代码
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而超时,此时锁已经被别人获得,这时就不必解锁了。
            lock.unlock();
        }
    }

}

基于zookeeper:

左边的整个区域表示一个Zookeeper集群。locker是Zookeeper的一个持久节点。node_x是locker这个持久节点下面的临时顺序节点。client_x表示多个客户端。Service表示需要互斥访问的共享资源。

Zookeeper如何解决前面提到的问题:

仍然存在的问题:

curator实现zookeeper的分布式锁:

    package bjsxt.curator.lock;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;


    public class Lock {

        /** zookeeper地址 */
        static final String CONNECT_ADDR = "192.168.0.4:2181,192.168.0.9:2181,192.168.0.6:2181";
        /** session超时时间 */
        static final int SESSION_OUTTIME = 5000;
        
        static int count = 10;
        public static void genarNo(){
            try {
                count--;
                System.out.println(Thread.currentThread().getName()+" : "+count);
            } finally {
            
            }
        }
        
        public static void main(String[] args) throws Exception {
            
            //1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                                                         .connectString(CONNECT_ADDR)
                                                         .sessionTimeoutMs(SESSION_OUTTIME)
                                                         .retryPolicy(retryPolicy)
                                                         //.namespace("super")
                                                         .build();
            //3 开启连接
            cf.start();
            
            //4 分布式锁
            final InterProcessMutex lock = new InterProcessMutex(cf, "/lockpath");
            final CountDownLatch countdown = new CountDownLatch(1);
            
            for(int i = 0; i < 10; i++){

                //循环开始10个线程,模仿多个客户端同时操作
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            countdown.await();
                            //加锁
                            lock.acquire();
                            //-------------业务处理开始
                            genarNo();
                            //-------------业务处理结束
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                //释放
                                lock.release();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }, "t"+i ).start();
            }
            Thread.sleep(100);
            countdown.countDown();
        }
    }

运行结果为依次打印出9876543210。
不加锁的话,一种可能的结果为:7564271703。


参考:
http://www.jianshu.com/p/c77a5257303a
http://blog.csdn.net/pengshuai128/article/details/70593995
http://www.cnblogs.com/520playboy/p/6441651.html

上一篇下一篇

猜你喜欢

热点阅读