ZooKeeper应用场景:集群管理
文章转载至:https://www.cnblogs.com/luangeng/p/7589132.html
Zookeeper--集群管理
在多台服务器组成的集群中,需要监控每台服务器的状态,一旦某台服务器挂掉了或有新的机器加入集群,集群都要感知到,从而采取相应的措施。一个主动的集群可以自动感知节点的死亡和新节点的加入,它才对更高效的提供服务。通常的做法是有台主机器定时的去获取其他机器的心跳,或其他机器定时主动汇报自己的状态,这种方式存在一定的延时,并且主机器成为单点,一旦挂掉便影响整个集群。
使用Zookeeper可以方便的实现集群管理的功能。思路如下,每个服务器启动时都向zk服务器提出创建临时节点的请求,并且使用getChildren设置父节点的观察,当该服务器挂掉之后,它创建的临时节点也被Zookeeper服务器删除,然后会触发监视器,其他服务器便得到通知。创建新节点也是同理。
并且利用Zookeeper的Leader选举功能可以选出服务中的一台作为Leader,在比如任务调度类似的场景中有用。
下面是一个简单的模拟:
ServerUnit模拟在不同机器上启动的服务,启动时向Zookeeper服务器注册自己,并保存自己的IP和端口;实现CallBack接口:在其他节点发生变化时执行的逻辑
publicclass ServerUnit {
publicstaticfinalString SER_NAME = "ServerUnit";
publicstaticvoidmain(String[] args)throws InterruptedException, IOException, KeeperException {
System.out.println("begin register to Zookeeper..");
String address = IPUtil.getLoaclIP() + ":" +newRandom().nextInt(255);
ServiceMng mng =new ServiceMng(SER_NAME);
String serverId = mng.register(address,newCallBack() {
@Override
publicvoidcallback(ServiceMng.ChildrenChangedResult cn)throws KeeperException, InterruptedException {
for (String str : cn.getUp()) {
System.out.println("检测到服务加入: " + mng.queryAddress(str));
}
for (String str : cn.getDown()) {
System.out.println("检测到服务退出: " + mng.queryAddress(str));
}
}
});
System.out.println("ServerUnit started at: " + address);
TimeUnit.HOURS.sleep(1);
}
}
CallBack接口:
publicinterfaceCallBack {
voidcallback(T t)throws Exception;
}
---
ServerMng 提供向Zookeeper注册服务和获取服务等方法,被服务单元依赖
publicclass ServiceMng {
privatestaticfinalString APPS_PATH = "/__apps__";
private String serviceName;
private ZooKeeper zk;
privateCountDownLatch latch =newCountDownLatch(1);
privateList serList;
privateMap serMap =newHashMap<>();
ServiceMng(String serviceName) {
this.serviceName = serviceName;
}
publicString register(String address, CallBack callback)throws KeeperException, InterruptedException, IOException {
if(zk !=null) {
thrownewIllegalArgumentException("method should not invoke twice.");
}
zk =newZooKeeper("localhost", 30000,new Watcher() {
@Override
publicvoid process(WatchedEvent watchedEvent) {
if(watchedEvent.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
if(watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
try {
List list = zk.getChildren(APPS_PATH + "/" + serviceName,true);
refresh(list);
callback.callback(new ChildrenChangedResult(list, serList));
serList = list;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
latch.await();
if(zk.exists(APPS_PATH,false) ==null) {
zk.create(APPS_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if(zk.exists(APPS_PATH + "/" + serviceName,false) ==null) {
zk.create(APPS_PATH + "/" + serviceName,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String path = zk.create(APPS_PATH + "/" + serviceName + "/", address.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List list = zk.getChildren(APPS_PATH + "/" + serviceName,true);
refresh(list);
serList = list;
return path;
}
privatevoidrefresh(List paths)throws KeeperException, InterruptedException {
for (String path : paths) {
byte[] b = zk.getData(APPS_PATH + "/" + serviceName + "/" + path,false,null);
serMap.put(path, new String(b));
}
}
publicString queryLeaderIp(String serviceName)throws KeeperException, InterruptedException {
List apps = zk.getChildren(APPS_PATH + "/" + serviceName,false);
if (apps.isEmpty()) {
returnnull;
}
Collections.sort(apps);
byte[] data = zk.getData(apps.get(0),false,null);
returnnew String(data);
}
publicString queryRandomServerIp(String serviceName)throws KeeperException, InterruptedException {
List apps = zk.getChildren(APPS_PATH + "/" + serviceName,false);
if (apps.isEmpty()) {
returnnull;
}
Random r =new Random();
byte[] data = zk.getData(apps.get(r.nextInt(apps.size())),false,null);
returnnew String(data);
}
public String queryAddress(String path) {
return serMap.get(path);
}
publicstaticclass ChildrenChangedResult {
List up =null;
List down =null;
ChildrenChangedResult(List now, List last) {
up =new LinkedList(now);
up.removeAll(last);
down =new LinkedList(last);
down.removeAll(now);
}
publicList getUp() {
return up;
}
publicList getDown() {
return down;
}
}
}
依次启动3个ServerUnit,查看控制台:
依次关闭2个ServerUnit,查看控制台:
从关闭ServerUnit到控制台打印退出大概延迟8s左右,配置zk的tickTime=1000,比预期的要慢一些。