基于zookeeper的分布锁解决方案。
2019-04-23 本文已影响0人
冰三尺
分布式情况下, 比较难的一个问题是如何保证数据的最终唯一. 在单进程情况下, 多线程操作数据, 我们只需要保证代码同步即可, 同一时间只允许一个线程来操作数据, 通过 Java 提供的并发 API 我们可以解决. 但是在分布式情况下, 无法使用此方法来解决, 因为在分布式情况下, 我们面对的不是多线程, 而是多进程, 也许多个服务都不在同一个物理机器上运行, 进程与进程之间无法互相同步数据.
同一个进程中使用锁之所以可以解决多线程数据同步的问题, 是因为我们可以在进程中设置一个标记, 而这个标记所有的线程都可以访问的到.
但是在多进程中, 无法在A进程中设置一个标记让其他的不同进程里的线程都能访问的到. 所以也就无法实现数据唯一.
那么问题来了, 只要有一个可以让所有的进程都能访问到的数据, 就可以实现数据同步, 数据唯一.
zk分布式锁原理
通过临时节点, 临时节点特征:会话连接结束后, 节点会自动删除
1.多个服务器在zk上创建同一个临时节点, 临时节点不允许重复
2.谁能成功创建临时节点, 谁就拿到锁, 其他服务没有在zk上创建临时节点成功, 处于等待状态.
3.通过事件通知, 监听到节点被删除之后, 其他服务又开始抢着去创建临时节点, 创建成功, 便获取锁.
创建连接获取锁, 关闭连接释放锁.
zookeeper就是那个所有的进程线程都可以访问到的数据, 由zookeeper来统一调度管理分布式锁.
引入pom依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
定义锁服务
//##lock 锁 定义分布式锁
public interface Lock {
//获取锁
void getLock();
//釋放鎖
void unLock();
}
定义一个抽象类ZookeeperAbstractLock实现Lock接口, 该类主要是进行获取锁服务和解锁
//抽象类是类的一种, 抽象类不能实例化对象
//重构重复代码,将重复代码交给子类执行
public abstract class ZookeeperAbstractLock implements Lock {
// zk连接地址
private static final String CONNECTSTRING = "127.0.0.1:2181";
// 创建zk连接
protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
protected static final String PATH = "/lock";
protected CountDownLatch countDownLatch = null;
public void getLock() {
if (tryLock()) {
System.out.println("###获取锁成功#####");
} else {
// 等待
waitLock();
// 重新获取锁
getLock();
}
}
// 是否获取锁成功,成功返回true 失败返回fasle
abstract Boolean tryLock();
// 等待
abstract void waitLock();
public void unLock() {
if (zkClient != null) {
zkClient.close();
System.out.println("释放锁资源, 关闭连接");
System.out.println();
}
}
}
ZookeeperDistrbuteLock主要是进项节点创建, 节点监听, 节点删除
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
@Override
Boolean tryLock() {
try {
zkClient.createEphemeral(PATH);
return true;
} catch (Exception e) {
return false;
}
}
@Override
void waitLock() {
// 使用事件监听,获取到节点被删除,
IZkDataListener iZkDataListener = new IZkDataListener() {
// 当节点被删除
public void handleDataDeleted(String dataPath) throws Exception {
if (countDownLatch != null) {
// 唤醒
countDownLatch.countDown();
}
}
// 当节点发生改变
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 注册节点信息
zkClient.subscribeDataChanges(PATH, iZkDataListener);
if (zkClient.exists(PATH)) {
// 创建信号量
countDownLatch = new CountDownLatch(1);
try {
// 等待
countDownLatch.await();
} catch (Exception e) {
}
}
// 删除事件通知, 当await时, 代码不会继续往下走, 当countDown时, 代码才会继续往下走
zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
}
}
由于条件限制, 只进行多线程测试
public class Service implements Runnable {
private Lock lock = new ZookeeperDistrbuteLock();
public void run() {
// 这里上锁, 假如
try {
// 上锁
lock.getLock();
// 模拟用户生成订单号
getNumber();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 釋放鎖資源
lock.unLock();
}
}
public String getNumber() {
try {
Thread.sleep(200);
} catch (Exception e) {
// TODO: handle exception
}
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
return simpt.format(new Date());
}
public static void main(String[] args) {
System.out.println("##模拟生成订单号开始...");
/**
* 此处不能使用创建单个service, 这样的话所有的service对象使用同一个zk客户端连接zk,
* 当某一个对象关闭了连接之后, zk客户端与zk直间的连接就被关闭了, 就不能被连接了.
* zk是只有一个, 连接zk的客户端却有好几个
* java.lang.IllegalStateException: ZkClient already closed!
* Service service = new Service();
* new Thread(service).start();
*/
for (int i = 0; i < 100; i++) {
new Thread(new Service()).start();
}
}
}