基于zookeeper的分布式锁实现
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现
准备工作
有几个帮助类,先把代码放上来
ZKClient 对zk的操作做了一个简单的封装
Java代码
packagezk.lock;
importorg.apache.zookeeper.*;
importorg.apache.zookeeper.data.Stat;
importzk.util.ZKUtil;
importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.TimeUnit;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午8:50
* 封装一个zookeeper实例.
*/
publicclassZKClientimplementsWatcher {
privateZooKeeper zookeeper;
privateCountDownLatch connectedSemaphore =newCountDownLatch(1);
publicZKClient(String connectString,intsessionTimeout)throwsException {
zookeeper =newZooKeeper(connectString, sessionTimeout,this);
System.out.println("connecting zk server");
if(connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
System.out.println("connect zk server success");
}else{
System.out.println("connect zk server error.");
thrownewException("connect zk server error.");
}
}
publicvoidclose()throwsInterruptedException {
if(zookeeper !=null) {
zookeeper.close();
}
}
publicvoidcreatePathIfAbsent(String path,booleanisPersistent)throwsException {
CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
path = ZKUtil.normalize(path);
if(!this.exists(path)) {
zookeeper.create(path,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
}
}
publicbooleanexists(String path)throwsException {
path = ZKUtil.normalize(path);
Stat stat = zookeeper.exists(path,null);
returnstat !=null;
}
publicString getData(String path)throwsException {
path = ZKUtil.normalize(path);
try{
byte[] data = zookeeper.getData(path,null,null);
returnnewString(data);
}catch(KeeperException e) {
if(einstanceofKeeperException.NoNodeException) {
thrownewException("Node does not exist,path is ["+ e.getPath() +"].", e);
}else{
thrownewException(e);
}
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
thrownewException(e);
}
}
@Override
publicvoidprocess(WatchedEvent event) {
if(event ==null)return;
// 连接状态
Watcher.Event.KeeperState keeperState = event.getState();
// 事件类型
Watcher.Event.EventType eventType = event.getType();
// 受影响的path
// String path = event.getPath();
if(Watcher.Event.KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if(Watcher.Event.EventType.None == eventType) {
System.out.println("zookeeper connect success");
connectedSemaphore.countDown();
}
}
//下面可以做一些重连的工作.
elseif(Watcher.Event.KeeperState.Disconnected == keeperState) {
System.out.println("zookeeper Disconnected");
}elseif(Watcher.Event.KeeperState.AuthFailed == keeperState) {
System.out.println("zookeeper AuthFailed");
}elseif(Watcher.Event.KeeperState.Expired == keeperState) {
System.out.println("zookeeper Expired");
}
}
}
ZKUtil 针对zk路径的一个工具类
Java代码
packagezk.util;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午9:56
*/
publicclassZKUtil {
publicstaticfinalString SEPARATOR ="/";
/**
* 转换path为zk的标准路径 以/开头,最后不带/
*/
publicstaticString normalize(String path) {
String temp = path;
if(!path.startsWith(SEPARATOR)) {
temp = SEPARATOR + path;
}
if(path.endsWith(SEPARATOR)) {
temp = temp.substring(0, temp.length()-1);
returnnormalize(temp);
}else{
returntemp;
}
}
/**
* 链接两个path,并转化为zk的标准路径
*/
publicstaticString contact(String path1,String path2){
if(path2.startsWith(SEPARATOR)) {
path2 = path2.substring(1);
}
if(path1.endsWith(SEPARATOR)) {
returnnormalize(path1 + path2);
}else{
returnnormalize(path1 + SEPARATOR + path2);
}
}
/**
* 字符串转化成byte类型
*/
publicstaticbyte[] toBytes(String data) {
if(data ==null|| data.trim().equals(""))returnnull;
returndata.getBytes();
}
}
NetworkUtil 获取本机IP的工具方法
Java代码
packagezk.util;
importjava.net.InetAddress;
importjava.net.NetworkInterface;
importjava.util.Enumeration;
/**
* User: zhenghui
* Date: 14-4-1
* Time: 下午4:47
*/
publicclassNetworkUtil {
staticprivatefinalcharCOLON =':';
/**
* 获取当前机器ip地址
* 据说多网卡的时候会有问题.
*/
publicstaticString getNetworkAddress() {
Enumeration netInterfaces;
try{
netInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress ip;
while(netInterfaces.hasMoreElements()) {
NetworkInterface ni = netInterfaces
.nextElement();
Enumeration addresses=ni.getInetAddresses();
while(addresses.hasMoreElements()){
ip = addresses.nextElement();
if(!ip.isLoopbackAddress()
&& ip.getHostAddress().indexOf(COLON) == -1) {
returnip.getHostAddress();
}
}
}
return"";
}catch(Exception e) {
return"";
}
}
}
--------------------------- 正文开始 -----------------------------------
这种实现非常简单,具体的流程如下
对应的实现如下
Java代码
packagezk.lock;
importzk.util.NetworkUtil;
importzk.util.ZKUtil;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午8:37
* 分布式锁实现.
*
* 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
* .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
*/
publicclassDistributedLock01 {
privateZKClient zkClient;
publicstaticfinalString LOCK_ROOT ="/lock";
privateString lockName;
publicDistributedLock01(String connectString,intsessionTimeout,String lockName)throwsException {
//先创建zk链接.
this.createConnection(connectString,sessionTimeout);
this.lockName = lockName;
}
publicbooleantryLock(){
String path = ZKUtil.contact(LOCK_ROOT,lockName);
String localIp = NetworkUtil.getNetworkAddress();
try{
if(zkClient.exists(path)){
String ownnerIp = zkClient.getData(path);
if(localIp.equals(ownnerIp)){
returntrue;
}
}else{
zkClient.createPathIfAbsent(path,false);
if(zkClient.exists(path)){
String ownnerIp = zkClient.getData(path);
if(localIp.equals(ownnerIp)){
returntrue;
}
}
}
}catch(Exception e) {
e.printStackTrace();
}
returnfalse;
}
/**
* 创建zk连接
*
*/
protectedvoidcreateConnection(String connectString,intsessionTimeout)throwsException {
if(zkClient !=null){
releaseConnection();
}
zkClient =newZKClient(connectString,sessionTimeout);
zkClient.createPathIfAbsent(LOCK_ROOT,true);
}
/**
* 关闭ZK连接
*/
protectedvoidreleaseConnection()throwsInterruptedException {
if(zkClient !=null) {
zkClient.close();
}
}
}
总结
网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更
核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx
1. 项目核心代码结构截图
项目模块依赖
特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化
2. 项目依赖介绍
2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:
2.2 Dubbo独立服务项目依赖如下图:
3. 项目功能部分截图:
zookeeper、dubbo服务启动
dubbo管控台
REST服务平台