分布式锁的实现

2020-08-15  本文已影响0人  笔记本一号

zookeeper分布式锁

方法一:

线程在zookeeper创建相同的临时znode,但是只有一个线程才能创建,因为zookeeper不予许创建相同的znode,当znode已经被其他线程抢先创建,而其他线程就会设置watch监听这个临时znode。当占有锁的线程执行完成断开连接,临时znode消失,其他线程再一起尝试创建临时znode抢占锁,重复这个过程直到所有线程执行完成从而实现了分布式锁,但是这个方法会引起羊群效应消耗系统的性能,所以使用下面的方法二会更好

方法二:

线程在zookeeper创建有序临时节点,只有节点序号最小的那个节点才能获取到锁执行业务,后面的节点等待着前面的节点执行完业务直到节点被删除才能轮到他们,然后在按照节点的序号从小到达大获取锁

public interface Lock {
    //获取锁
    void getLock();
    //释放锁
    void unLock();
}
public abstract class AbstractZkLock implements Lock{
    private static  final Logger log=LoggerFactory.getLogger(AbstractZkLock.class);
    public static  final String ZKSERVERPATH = "192.168.200.130:2181,192.168.200.131:2181,192.168.200.132:2181";
    public static final Integer TIMEOUT = 50000;
    protected static volatile  ZkClient zkClient=null;
    protected static final String PATH = "/lock";
    protected String WAIT_PATH;
    protected String CURRENTLOCK;

    //双重检测机制
    public static ZkClient getZkClient(){
        if (zkClient==null){
            synchronized (AbstractZkLock.class){
                if (zkClient==null){
                    zkClient=new ZkClient(ZKSERVERPATH,TIMEOUT);

                    return zkClient;
                }
            }
        }
        return zkClient;
    }
    @Override
    public void getLock() {
        if (tryLock()){
            log.info("开始");
            System.out.println(Thread.currentThread().getName()
                    +"->"+CURRENTLOCK+"获得锁成功");
            return;
        }else {
            waitLock();
            System.out.println(Thread.currentThread().getName()
                    +"->"+CURRENTLOCK+"进入等待锁阶段");

        }
    }

    abstract void waitLock();

    abstract boolean tryLock();

    @Override
    public void unLock() {
        zkClient.delete(CURRENTLOCK);
    }
}

public class ExcutorZkLock extends AbstractZkLock {
    private static final Logger log= LoggerFactory.getLogger(ExcutorZkLock.class);
    @Override
    void waitLock() {
        log.info("等待锁");
        if(zkClient.exists(WAIT_PATH)){
            try {
            System.out.println(Thread.currentThread().getName()
                    +"等待节点:"+WAIT_PATH+"解锁");
            CountDownLatch latch = new CountDownLatch(1);
            zkClient.subscribeDataChanges(WAIT_PATH, new IZkDataListener(){
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                }
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.println(WAIT_PATH+"锁被删除了");
                    latch.countDown();
                }
            });
            latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    boolean tryLock() {
        log.info("尝试获取锁");
        zkClient = getZkClient();
       // lisnerZk();
        CURRENTLOCK = zkClient.createEphemeralSequential(PATH + "/", "test");
        List<String> children = zkClient.getChildren(PATH);
        SortedSet<String> sortSet = new TreeSet<>();
        for (String str : children) {
            sortSet.add(str);
        }
        String first = sortSet.first();
        if (first.equals(CURRENTLOCK.replace(PATH + "/", ""))) {
            return true;
        }
        SortedSet<String> headSet = sortSet.headSet(CURRENTLOCK.replace(PATH + "/", ""));
        if (CollectionUtils.isNotEmpty(headSet)) {
            WAIT_PATH = PATH+"/"+headSet.last();
        }
        return false;
    }

    private void lisnerZk(){
        zkClient.subscribeChildChanges(PATH,
                (String parentPath, List<String> currentChilds)->
                System.out.println(Thread.currentThread().getName()+"在"+ parentPath+"下创建了子节点"));
    }
}

public class Test {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for(int i=0;i<10;i++){
            new Thread(()->{
                try {
                    Lock lock=new ExcutorZkLock();
                    lock.getLock();
                    TimeUnit.SECONDS.sleep(5);
                    lock.unLock();
                    countDownLatch.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            },"thread--"+i).start();
            countDownLatch.countDown();
            System.out.println("线程结束");
        }
    }
}

image.png

redis分布式锁

方法一:

线程在redis中使用setnx创建一个有时效的key,但是setnx会判断key是否存在,存在就创建失败,如果这个key被其他线程抢先创建了,其他线程只能等待这个key被删除或者过期。这里有问题需要注意的是,设置过期时间是为了避免死锁问题的发送,但是如果业务执行时间比过期时间还要长(也就是说在某个线程执行业务代码时,redis的锁就到期了,锁就被失效了),就会造成锁的长久失效,决绝的方案就是在给redis设置过期时间后,执行业务代码时启动一个定时器,在业务代码执行期间定时检测线程的锁是否还存在,存在就给锁的过期时间初始化,定期检测间隔一般为过期时间的1/3

方法二:

使用redisson实现方法一会非常简单。

使用redis演示分布式锁
@Service
public class RedisLock {
    private static final Logger logger = LoggerFactory.getLogger(ReentrantReadWriteLock.ReadLock.class);
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private static final long TIMEOUT = 10000;
    private static final String LOCK = "LOCK";

    public void RedisLockTest() {
        logger.info("开始获取锁");
        String tid = UUID.randomUUID().toString();
        Boolean setnx= false;
        try{
               setnx= stringRedisTemplate.opsForValue()
                .setIfAbsent(LOCK, tid ,TIMEOUT,TimeUnit.SECONDS); 
                 if(!setnx){
                    return "error";
                  }
          //此处应当开启一个定时器任务,每隔锁的过期时间的1/3检查线程的锁是否还还持有锁,
//有则初始化锁的过期时间这样可以避免锁因过期时间比业务执行时间短而造成失效,
//我这里只是提供了这样的一个思路,由于代码比较复杂我这里省略了service的代码
           service(tid);
         }finally{
            if(tid.equals(stringRedisTemplate.opsForValue().get(LOCK))){
                stringRedisTemplate.delete(LOCK);
              }
            }
    }

    public void service(String tid){
     //定时器任务  
     .......
    }
}

使用redisson演示分布式锁
public class RedissonLock {

    private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);
    private static final String LOCK = "LOCK";
    @Autowired
    private RedissonClient redissonClient;

    public void RedisLockTest() {
        log.info("开始获取锁");
        //设置要加锁的key
        RLock lock = redissonClient.getLock(LOCK);
        boolean getLock = false;
        try {
            //trylock表示尝试去加锁
            //第一个参数表示等待时间,
            // 第二个参数表示锁的失效时间,
            // 加锁成功,返回true,继续执行true下面代码;
            // 如果加锁失败,它会等待第一个参数设置的时间,如果加锁成功返回true,失败返回false
            if(getLock = lock.tryLock(0,5000, TimeUnit.MILLISECONDS)){
                log.info("Redisson获取到分布式锁:{},ThreadName:{}",LOCK,Thread.currentThread().getName());
                //执行业务
                service();
            }else{
                log.info("Redisson没有获取到分布式锁:{},ThreadName:{}",LOCK,Thread.currentThread().getName());
            }
        } catch (InterruptedException e) {
            log.error("Redisson分布式锁获取异常",e);
        } finally {
            if(!getLock){
                return;
            }
            lock.unlock();
            log.info("Redisson分布式锁释放锁");
        }
    }
    public void service(){
        try {
            System.out.println("正在执行业务");
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上一篇 下一篇

猜你喜欢

热点阅读