zookeeper实现分布式锁
2020-05-22 本文已影响0人
sunpy
1. 什么时候使用分布式锁
当多个系统分布在不同机器上时,系统之间需要以同步方式访问共享资源,而为了防止出现脏读,脏写,以及产生重复数据的情况等,使用互斥手段来保证数据的一致性,而使用分布式锁。
2. 实现分布式排他锁
思路:
(1)在zookeeper上创建指定/zk-spy持久化根节点
(2)在根节点下面创建临时顺序节点/zk-spy/lock_xxxxxx,从根节点获取下面所有的子节点,判断临时顺序节点是否为子节点中最小节点。是那么就获取了锁,不是就为前一个节点添加删除监听事件。并且让其当前线程进行等待。
(3)删除监听事件生效,那么解除等待,返回第二步重新进行操作,直到获取锁为止。
实现:
接口定义:
public interface IZkLock {
// 建立连接
public void connect();
// 创建持久根节点
public String createRootNode() throws UnsupportedEncodingException, KeeperException, InterruptedException;
// 尝试获取锁
public boolean tryAcquire();
// 获取锁
public void acquire() throws Exception;
// 阻塞等待
public void acquireWait() throws Exception;
// 删除锁
public void release() throws Exception;
}
实现类:
public class ZkLock implements IZkLock{
// 控制线程执行顺序
private CountDownLatch cdl = new CountDownLatch(1);;
// zookeeper连接地址 "XX.XX.XXX.XXX:2181"
private String zkAddr;
// 重连时间
private int timeout;
// 持久化根节点路径 "/zk-spy"
private String rootPath;
// 当前节点路径
private String curPath;
// 前驱节点路径
private String prevPath;
private ZooKeeper zookeeper;
public ZkLock(String zkAddr, int timeout, String rootPath) {
this.zkAddr = zkAddr;
this.timeout = timeout;
this.rootPath = rootPath;
}
/**
* 建立连接zookeeper
*/
@Override
public void connect() {
try {
zookeeper = new ZooKeeper(zkAddr, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 创建持久根节点
*/
@Override
public String createRootNode() throws UnsupportedEncodingException, KeeperException, InterruptedException {
Stat stat = zookeeper.exists(rootPath, false);
if (stat != null) {
return rootPath;
} else {
return zookeeper.create(rootPath,
rootPath.getBytes("UTF-8"),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
/**
* 尝试获取锁
*/
@Override
public boolean tryAcquire() {
try {
if (StringUtils.isBlank(curPath)) {
String lockPath = rootPath + "/lock_";
curPath = zookeeper.create(lockPath,
lockPath.getBytes("UTF-8"),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> list = zookeeper.getChildren(rootPath, false);
Collections.sort(list);
// 当前节点就是最小节点
if (curPath.equals(rootPath + "/" + list.get(0))) {
return true;
} else { // 如果当前节点不是最小节点,获取其前一个节点
String partLockPath = curPath.substring(
curPath.lastIndexOf("/")+1,
curPath.length());
String prevNode = list.get(list.indexOf(partLockPath)-1);
prevPath = rootPath + "/" + prevNode;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 获取锁
*/
@Override
public void acquire() throws Exception {
if (tryAcquire()) {
System.out.println(Thread.currentThread().getName() + " ---> 获取锁成功了 curPath : " + curPath);
} else {
System.out.println(Thread.currentThread().getName() + " ---> 获取锁失败了,进入等待" );
acquireWait();
acquire();
}
}
/**
* 阻塞等待
*/
@Override
public void acquireWait() throws Exception {
Stat stat = zookeeper.exists(prevPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (EventType.NodeDeleted == event.getType()) {
cdl.countDown();
}
}
});
if (stat != null) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 删除锁
* @throws Exception
*/
@Override
public void release() throws Exception {
zookeeper.delete(curPath, -1);
zookeeper.close();
}
}
测试:
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
ZkLock zel = new ZkLock("XX.XX.XXX.XXX:2181", 4000, "/zk-spy");
zel.connect();
try {
zel.createRootNode();
zel.acquire();
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
zel.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
t.start();
}
}
Thread-9 ---> 获取锁失败了,进入等待
Thread-10 ---> 获取锁失败了,进入等待
Thread-13 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000302
Thread-11 ---> 获取锁失败了,进入等待
Thread-6 ---> 获取锁失败了,进入等待
Thread-16 ---> 获取锁失败了,进入等待
Thread-19 ---> 获取锁失败了,进入等待
Thread-8 ---> 获取锁失败了,进入等待
Thread-0 ---> 获取锁失败了,进入等待
Thread-1 ---> 获取锁失败了,进入等待
Thread-17 ---> 获取锁失败了,进入等待
Thread-14 ---> 获取锁失败了,进入等待
Thread-18 ---> 获取锁失败了,进入等待
Thread-3 ---> 获取锁失败了,进入等待
Thread-7 ---> 获取锁失败了,进入等待
Thread-4 ---> 获取锁失败了,进入等待
Thread-12 ---> 获取锁失败了,进入等待
Thread-15 ---> 获取锁失败了,进入等待
Thread-2 ---> 获取锁失败了,进入等待
Thread-5 ---> 获取锁失败了,进入等待
Thread-9 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000303
Thread-10 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000304
Thread-16 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000305
Thread-19 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000306
Thread-3 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000307
Thread-6 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000308
Thread-18 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000309
Thread-2 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000310
Thread-11 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000311
Thread-8 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000312
Thread-0 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000313
Thread-1 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000314
Thread-15 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000315
Thread-12 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000316
Thread-17 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000317
Thread-4 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000318
Thread-7 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000319
Thread-5 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000320
Thread-14 ---> 获取锁成功了 curPath : /zk-spy/lock_0000000321
问题:
-
死循环的出现:
发现第二个获取锁的线程,老是不停的获取锁失败,进入等待。
程序没有判断当前的顺序临时节点创建的路径只有在空的情况下,才能创建,如果没有判断,总是创建新临时顺序节点来与根节点下面的最小孩子节点,总是不满足进入等待状态。加上我的递归调用,将出现死循环。 -
出现了空指针异常,在创建zookeeper的时候:
java.lang.NullPointerException: null
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:532)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:507)
由于我们默认传入的Watcher为null,而在执行监听事件时,会遍历Watcher,却不判断是否为null,来执行事件。
ClientCnxn.EventThread.processEvent方法:
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
所以在zookeeper建立连接的时候,需要创建一个空的Watcher。
- 多线程执行createRootNode()方法出现NodeExistsException异常。当第一个线程进入方法,发现该节点不存在,创建持久节点时,第二个线程进入方法也方法该节点不存在,创建持久节点,抛出该异常。
解决:捕获这个异常,发现该节点存在,就直接返回即可。
public void createRootNode() throws UnsupportedEncodingException, KeeperException, InterruptedException {
Stat stat = zookeeper.exists(rootPath, false);
if (stat != null) {
return;
} else {
try {
zookeeper.create(rootPath,
rootPath.getBytes("UTF-8"),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
return;
}
}
}