分布式相关redis&mongo&zookeeper

zookeeper 分布式锁实现(附源码及方案测试结果)

2019-11-04  本文已影响0人  小白的世界你不懂

一、 Zookeeper

项目地址:https://github.com/bao17634/zookeeper_lock.git

1.1 zookeeper简介:

1.2 zookeeper功能:

提供文件系统及通知机制。

1.2.1 文件系统:
zookeeper维护一个类似文件系统的数结构,如下图:

结构图

每子目录如NameService都被作为zknode,和文件一样,可以自由增加及删除,唯一不同的是他可以用来存储数据。zknode分为四种类型:

1.2.2 通知机制: 客户端注册监听他关心的目录节点,当目录节点发生变化(数据改变、被删除、目录节点增加删除)时,zookeeper会通知客户端。
集群架构如图:

二、 zookeeper集群

2.1 zookeeper集群搭建教程: https://www.jianshu.com/p/a5fda39f20d0

2.2 集群简介:

为了保证高可用,zookeeper需要以集群形态来部署,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么zookeeper本身仍然是可用的。客户端在使用zookeeper时,需要知道集群机器列表,通过与集群中的某一台机器建立TCP连接来使用服务,客户端使用这个TCP链接来发送请求、获取结果、获取监听事件以及发送心跳包。如果这个连接异常断开了,客户端可以连接到另外的机器上。这样客户端的读请求就可以被任意一台机器处理,如果请求在节点上注册了监听器,这个监听器也是由所连接的zookeeper机器来处理。

集群架构如下图:

集群架构图
说明: 对于写请求,这些请求会同时发给其他zookeeper机器并且达成一致后,请求才会返回成功,因此随着zookeeper的集群增多,读请求的吞吐量提高但是写请求的吞吐量会下降。

2.3 zookeeper分布式锁实现

2.3.1 实现流程:

zookeeper组成图 zookeeper结构图

2.3.2 分布式锁代码实现(利用curator实现):

1)引入依赖:

    <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>4.0.0</version>
   </dependency>
   <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>4.0.0</version>
   </dependency>

2)编写配置类:

@Configuration
public class CuratorConfiguration {
    @Autowired
    ConfigProperties configProperties;


    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(configProperties.getElapsedTimeMs(), configProperties.getRetryCount());
        return CuratorFrameworkFactory.newClient(
                configProperties.getConnectString(),
                configProperties.getSessionTimeoutMs(),
                configProperties.getConnectionTimeoutMs(),
                retryPolicy);
    }
}
  public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

创建zk连接,该方法要求传入的三个参数分别是:ip:端口、会话超时时间、连接超时时间、重试策略(由public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)方法提供,其要求传入的参数粉分别是: 重试间隔时间、重试次数)

4)curator的分布式锁:

curator提供了四种分布式锁,分别是:

image

1> 分布式可重入排它锁实现:
代码:

    public ApiResult processMutex() throws Exception {
        log.info("第{}个线程请求锁", ++count);
        try {
            // 创建分布式可重入排他锁,根节点为ROOT_LOCK_NODE
            InterProcessMutex mutex = new InterProcessMutex(curatorFramework, ROOT_LOCK_NODE );
            //加锁
            if (mutex.acquire(TIME_OUT, TimeUnit.SECONDS)) {
                try {
                    log.info("获得锁线程为:{}", ++number);
                    return ApiResult.ok("加锁成功");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    log.info("解锁");
                    //释放锁
                    mutex.release();
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        throw new RuntimeException("拿到锁失败");
    }

2> 分布式排它锁实现:
代码:

public ApiResult semaphoreMutex() throws Exception {
        log.info("第{}个线程请求锁", ++count);
        try {
             // 创建分布式排他锁,根节点为ROOT_LOCK_NODE
            InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, ROOT_LOCK_NODE);
            //加锁
            if (mutex.acquire(TIME_OUT, TimeUnit.SECONDS)) {
                try {
                    log.info("获得锁线程为:{}", ++number);
                    return ApiResult.ok("加锁成功");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                   //解锁
                    mutex.release();
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        throw new RuntimeException("拿到锁失败");
    }

2> 分布式读写锁实现:
代码:

public ApiResult readWriteLock() throws Exception {
        log.info("第{}个线程请求锁", ++count);
        try {
             // 创建分布式读写锁,根节点为ROOT_LOCK_NODE
            InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_LOCK_NODE);
            //创建读锁
            InterProcessMutex readLock = readWriteLock.readLock();
            //创建写锁
            InterProcessMutex writeLock = readWriteLock.writeLock();
            try {
                //注意只有先得到写锁在得到读锁,不能反过来
                if (!writeLock.acquire(TIME_OUT, TimeUnit.SECONDS)) {
                    throw new RuntimeException("得到写锁失败");
                }
                log.info("已经得到写锁");
                if (!readLock.acquire(TIME_OUT, TimeUnit.SECONDS)) {
                    throw new RuntimeException("得到读锁失败");
                }
                log.info("已经得到读锁");
                log.info("获得锁线程数为:{}", ++number);
                return ApiResult.ok("加锁成功");
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
               //解锁
                writeLock.release();
                readLock.release();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

三、分布式锁测试情况:

3.1 测试环境:

3.2 可重入排它锁:

1 x 10000压力测试 100 x 100压力测试 200 x 100压力测试 500 x 100压力测试

3.3 排他锁:

1 x 10000压力测试 100 x 100压力测试 200 x 100压力测试 500 x 100压力测试

3.4 读写锁:

1 x 10000 压力测试 100 x 100 测试压力 200 x 100 测试压力 500 x 100 测试压力

四、总结:

本文主要介绍了使用curator来实现zookeeper分布式锁,在研究zookeeper过程中我也写了利用zookeeper原生API来实现分布式锁,但是实现过程比较复杂,在测试的时候单线程虽然性能与上面三种性能差不多,但是在多线程测试中性能就非常差,所以这里就没有重点去介绍,具体实现请移步我的github

测试结果分析(主要考虑多线程):

上一篇下一篇

猜你喜欢

热点阅读