zookeeper手把手教程(四)
2017-10-22 本文已影响0人
黑白蓝调
1. 分布式应用场景
前面讲过分布式应用场景有:
- 负载均衡
- 分布式ID生成
- 分布式锁
- 分布式队列
- 消息订阅发布
- 命名服务
- master选举
负载均衡
请求分摊
master选举
package com.frame.test.gp.zookeeperAPI.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import java.util.concurrent.TimeUnit;
/**
* @author Administrator
* @CREATE 2017/8/12 20:12
*/
public class MasterSelector {
private final static String MASTER_PATH = "/curator_master_path";
public static void main(String[] args) {
CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("获得leader成功");
TimeUnit.SECONDS.sleep(2);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
});
leaderSelector.autoRequeue();
leaderSelector.start(); //开始选举
}
}
消息订阅发布
实现配置信息的集中式管理和数据的动态更新,典型场景disconf
实现配置中心有两种模式:push 、pull。
长轮训
zookeeper采用的是推拉相结合的方式。 客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,那么服务器端就会向客户端
发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据
- 数据量比较小
- 数据内容在运行时会发生动态变更
- 集群中的各个机器共享配置
分布式ID生成
利用zookeeper中的顺序节点的特性,制作分布式的序列号生成器(ID生成器),(在往数据库查询数据时,通常需要一个id,在单机环境下,可以利用数据库的自动成功id号,但是这种在分布式环境下就无法使用了,可以使用UUID,但是UUID有一个缺点,就是没有规律很难理解。使用zookeeper的命名服务可以生成有顺序的容易理解的,支持分布式的编号)
package com.frame.test.gp.zookeeperAPI.GenId;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Administrator
* @CREATE 2017/8/13 0:16
* 分布式ID生成器,实际上是产生持久化有序节点,然后获取有序节点,作为ID
*/
public class IdMark {
private ZkClient zkClient;
private final String server; //记录服务器的地址
private final String root; //记录父节点的路径
private final String nodeName; //几点名称
private volatile boolean runing = false;
private ExecutorService cleanExector;
public enum RemoveMethod {
NONE, //不
IMMEDIATELY, //立即
DELAY //延期
}
public IdMark(String server, String root, String nodeName) {
this.server = server;
this.root = root;
this.nodeName = nodeName;
}
public void start() throws Exception {
if (runing) {
throw new Exception("server has started ...");
}
runing = true;
init();
}
public void stop() throws Exception {
if (!runing) {
throw new Exception("server has stopped ...");
}
runing = false;
freeResource();
}
/**
* 初始化服务器资源
*/
private void init() {
zkClient = new ZkClient(server, 5000, 5000, new BytesPushThroughSerializer());
cleanExector = Executors.newFixedThreadPool(10);
try {
zkClient.createPersistent(root, true);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 释放资源
*/
private void freeResource() {
cleanExector.shutdown();
try {
cleanExector.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
cleanExector = null;
}
if (zkClient != null) {
zkClient.close();
zkClient = null;
}
}
/**
* 检查服务是否运行
*
* @throws Exception
*/
public void checkRunning() throws Exception {
if (!runing) {
throw new Exception("请先调用start ");
}
}
private String extractId(String str) {
int index = str.lastIndexOf(nodeName);
if (index >= 0) {
index += nodeName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
/**
* 获取ID
*
* @param removeMethod
* @return
*/
public String generateId(RemoveMethod removeMethod) throws Exception {
checkRunning();
final String fullNodePath = root.concat("/").concat(nodeName);
//创建顺序节点每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。
//基于这个特性,在创建子节点的时候,可以设置这个属性,呢么在创建节点过程,
//Zookeeper会自动为给定节点名添加后缀,作为新节点名
final String outPath = zkClient.createPersistentSequential(fullNodePath, null);
if (removeMethod.equals(RemoveMethod.IMMEDIATELY)) { //立即删除
zkClient.deleteRecursive(outPath);
} else if (removeMethod.equals(RemoveMethod.DELAY)) { //延期删除
cleanExector.execute(new Runnable() {
@Override
public void run() {
zkClient.delete(outPath);
}
});
}
return extractId(outPath);
}
public static void main(String[] args) throws Exception {
IdMark idMaker=null;
try {
idMaker = new IdMark("192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181",
"/NameService/IdGen", "ID-");
idMaker.start();
for (int i = 0; i < 2; i++) {
String id = idMaker.generateId(RemoveMethod.DELAY);
System.out.println(id);
}
}finally {
idMaker.stop();
}
}
}
分布式队列生成
package com.frame.test.gp.zookeeperAPI.queue;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* @author Administrator
* @CREATE 2017/8/12 19:25
*/
public class DistributedSimpleQueue<T> {
private final ZkClient zkClient;
private final String root;
private static final String node_name = "n_";
public DistributedSimpleQueue(ZkClient zkClient, String root) {
this.zkClient = zkClient;
this.root = root;
}
//获取队列的大小
public int size() {
//获取根节点下的所有子节点
return zkClient.getChildren(root).size();
}
//判断队列是否为空
public boolean isEmpty() {
return size() == 0;
}
//存入队列
public boolean offer(T element) {
try {
String nodeFullPath = root.concat("/").concat(node_name);
zkClient.createPersistentSequential(nodeFullPath, element);
} catch (ZkNoNodeException e) {
zkClient.createPersistent(root);
offer(element);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
//从队列中取出元素
public T poll() {
try {
List<String> list = zkClient.getChildren(root);
if (list.size() == 0) {
return null;
}
//将队列由小到大的顺序排序
Collections.sort(list, new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, node_name).compareTo(getNodeNumber(rhs, node_name));
}
});
/**
* 将队列中的元素做循环,然后构建完整的路径,在通过这个路径去读取数据
*/
for (String nodeName : list) {
String nodeFullPath = root.concat("/").concat(nodeName);
try {
T node = zkClient.readData(nodeFullPath);
zkClient.delete(nodeFullPath);
return node;
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public String getNodeNumber(String str, String nodeName) {
int index = str.lastIndexOf(nodeName);
if (index >= 0) {
index += node_name.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
}
package com.frame.test.gp.zookeeperAPI.queue;
import com.frame.entity.Admin;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
/**
* @author Administrator
* @CREATE 2017/8/12 22:33
*/
public class QueueTest {
private final static String CONNECTSTRING = "192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181";
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(CONNECTSTRING, 5000, 5000, new SerializableSerializer());
zkClient.deleteRecursive("/queue");
DistributedSimpleQueue<Admin> adminDistributedSimpleQueue = new DistributedSimpleQueue<>(zkClient, "/queue");
for (int i = 0; i < 10; i++){
Admin admin=new Admin();
admin.setAdminName("队列"+i);
admin.setAdminId((long) i);
adminDistributedSimpleQueue.offer(admin);
System.out.println("size: "+adminDistributedSimpleQueue.size());
}
Admin admin=adminDistributedSimpleQueue.poll();
System.out.println(admin.getAdminName());
}
}
分布式锁
package com.frame.test.gp.zookeeperAPI.javaapilock;
import com.frame.test.gp.thread.thread15.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author Administrator
* @CREATE 2017/8/10 23:10
*/
public class DistributeLock {
private static final String ROOT_LOCKS = "/LOCKS"; //根节点
private ZooKeeper zooKeeper;
private int sessionTimeOut; //会话超时时间
private String lockID; //记录锁节点ID
public static final byte[] data = {1, 2}; //节点数据
public CountDownLatch countDownLatch = new CountDownLatch(1);
public DistributeLock() throws IOException, InterruptedException {
this.zooKeeper = ZKClientUtils.getInstance();
this.sessionTimeOut = ZKClientUtils.getSessionTimeOut();
}
//获取锁的方法,有序节点最小的获得锁
public boolean lock() {
try {
lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "->成功创建了lock节点[" + lockID + "],开始竞争锁;");
List<String> childreaNodes = zooKeeper.getChildren(ROOT_LOCKS, true); //获取根节点下的所有子节点
//排序 从小到大
SortedSet<String> sortedSet = new TreeSet<String>();
for (String children : childreaNodes) {
sortedSet.add(ROOT_LOCKS + "/" + children);
}
String first = sortedSet.first();
if (lockID.equalsIgnoreCase(first)) {
//表示当前就是最小的节点
System.out.println(Thread.currentThread().getName() + "->成功获得锁,lock节点为:[" + lockID + "]");
return true;
}
//获得锁的下一个节点 监控锁节点,当锁释放之后下一个节点获取锁
SortedSet<String> lessThanLockId = sortedSet.headSet(lockID); //handSet 返回从开始节点到指定元素的集合
if (!lessThanLockId.isEmpty()) {
String prevLockID = lessThanLockId.last(); //拿到比当前LOCKID这个节点更小的上一个节点
zooKeeper.exists(prevLockID, new LockWatcher(countDownLatch));
countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);
//上面这段代码意味着如果会话超时或者节点被删除(释放)
System.out.println(Thread.currentThread().getName() + "成功获取锁:[" + lockID + "];下一个监控节点:"+prevLockID);
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public boolean unlock() {
System.out.println(Thread.currentThread().getName() + "->开始释放锁:[" + lockID + "]");
try {
zooKeeper.delete(lockID, -1);
System.out.println("节点[" + lockID + "]成功被删除");
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
final CountDownLatch countDownLatch = new CountDownLatch(1);
Random random = new Random();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
DistributeLock lock = null;
try {
lock = new DistributeLock();
countDownLatch.countDown();
countDownLatch.await();
lock.lock();
Thread.sleep(random.nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock != null) {
lock.unlock();
}
}
}).start();
}
}
}
package com.frame.test.gp.zookeeperAPI.javaapilock;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* @author Administrator
* @CREATE 2017/8/10 23:41
*/
public class LockWatcher implements Watcher{
private CountDownLatch countDownLatch;
public LockWatcher(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getType()== Event.EventType.NodeDeleted){
countDownLatch.countDown();
}
}
}
package com.frame.test.gp.zookeeperAPI.javaapilock;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @author Administrator
* @CREATE 2017/8/10 23:04
*/
public class ZKClientUtils {
private final static String CONNECTSTRING = "192.168.202.133:2181,192.168.202.134:2181,192.168.202.135:2181";
public static int getSessionTimeOut() {
return sessionTimeOut;
}
public static final int sessionTimeOut = 5000;
//获取链接
public static ZooKeeper getInstance() throws IOException, InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState()== Event.KeeperState.SyncConnected){
countDownLatch.countDown();
}
}
});
countDownLatch.await();
return zooKeeper;
}
}