ZooKeeper应用场景:集群管理

2019-09-28  本文已影响0人  RacyFu

文章转载至:https://www.cnblogs.com/luangeng/p/7589132.html

Zookeeper--集群管理

在多台服务器组成的集群中,需要监控每台服务器的状态,一旦某台服务器挂掉了或有新的机器加入集群,集群都要感知到,从而采取相应的措施。一个主动的集群可以自动感知节点的死亡和新节点的加入,它才对更高效的提供服务。通常的做法是有台主机器定时的去获取其他机器的心跳,或其他机器定时主动汇报自己的状态,这种方式存在一定的延时,并且主机器成为单点,一旦挂掉便影响整个集群。

使用Zookeeper可以方便的实现集群管理的功能。思路如下,每个服务器启动时都向zk服务器提出创建临时节点的请求,并且使用getChildren设置父节点的观察,当该服务器挂掉之后,它创建的临时节点也被Zookeeper服务器删除,然后会触发监视器,其他服务器便得到通知。创建新节点也是同理。

并且利用Zookeeper的Leader选举功能可以选出服务中的一台作为Leader,在比如任务调度类似的场景中有用。

下面是一个简单的模拟:

ServerUnit模拟在不同机器上启动的服务,启动时向Zookeeper服务器注册自己,并保存自己的IP和端口;实现CallBack接口:在其他节点发生变化时执行的逻辑

publicclass ServerUnit {

    publicstaticfinalString SER_NAME = "ServerUnit";

    publicstaticvoidmain(String[] args)throws InterruptedException, IOException, KeeperException {

        System.out.println("begin register to Zookeeper..");

        String address = IPUtil.getLoaclIP() + ":" +newRandom().nextInt(255);

        ServiceMng mng =new ServiceMng(SER_NAME);

        String serverId = mng.register(address,newCallBack() {

            @Override

            publicvoidcallback(ServiceMng.ChildrenChangedResult cn)throws KeeperException, InterruptedException {

                for (String str : cn.getUp()) {

                    System.out.println("检测到服务加入: " + mng.queryAddress(str));

                }

                for (String str : cn.getDown()) {

                    System.out.println("检测到服务退出: " + mng.queryAddress(str));

                }

            }

        });

        System.out.println("ServerUnit started at: " + address);

        TimeUnit.HOURS.sleep(1);

    }

}

CallBack接口:

publicinterfaceCallBack {

    voidcallback(T t)throws Exception;

}

---

ServerMng 提供向Zookeeper注册服务和获取服务等方法,被服务单元依赖

publicclass ServiceMng {

    privatestaticfinalString APPS_PATH = "/__apps__";

    private String serviceName;

    private ZooKeeper zk;

    privateCountDownLatch latch =newCountDownLatch(1);

    privateList serList;

    privateMap serMap =newHashMap<>();

    ServiceMng(String serviceName) {

        this.serviceName = serviceName;

    }

    publicString register(String address, CallBack callback)throws KeeperException, InterruptedException, IOException {

        if(zk !=null) {

            thrownewIllegalArgumentException("method should not invoke twice.");

        }

        zk =newZooKeeper("localhost", 30000,new Watcher() {

            @Override

            publicvoid process(WatchedEvent watchedEvent) {

                if(watchedEvent.getState() == Event.KeeperState.SyncConnected) {

                    latch.countDown();

                }

                if(watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {

                    try {

                        List list = zk.getChildren(APPS_PATH + "/" + serviceName,true);

                        refresh(list);

                        callback.callback(new ChildrenChangedResult(list, serList));

                        serList = list;

                    } catch (KeeperException e) {

                        e.printStackTrace();

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    } catch (Exception e) {

                        e.printStackTrace();

                    }

                }

            }

        });

        latch.await();

        if(zk.exists(APPS_PATH,false) ==null) {

            zk.create(APPS_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        }

        if(zk.exists(APPS_PATH + "/" + serviceName,false) ==null) {

            zk.create(APPS_PATH + "/" + serviceName,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        }

        String path = zk.create(APPS_PATH + "/" + serviceName + "/", address.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        List list = zk.getChildren(APPS_PATH + "/" + serviceName,true);

        refresh(list);

        serList = list;

        return path;

    }

    privatevoidrefresh(List paths)throws KeeperException, InterruptedException {

        for (String path : paths) {

            byte[] b = zk.getData(APPS_PATH + "/" + serviceName + "/" + path,false,null);

            serMap.put(path, new String(b));

        }

    }

    publicString queryLeaderIp(String serviceName)throws KeeperException, InterruptedException {

        List apps = zk.getChildren(APPS_PATH + "/" + serviceName,false);

        if (apps.isEmpty()) {

            returnnull;

        }

        Collections.sort(apps);

        byte[] data = zk.getData(apps.get(0),false,null);

        returnnew String(data);

    }

    publicString queryRandomServerIp(String serviceName)throws KeeperException, InterruptedException {

        List apps = zk.getChildren(APPS_PATH + "/" + serviceName,false);

        if (apps.isEmpty()) {

            returnnull;

        }

        Random r =new Random();

        byte[] data = zk.getData(apps.get(r.nextInt(apps.size())),false,null);

        returnnew String(data);

    }

    public String queryAddress(String path) {

        return serMap.get(path);

    }

    publicstaticclass ChildrenChangedResult {

        List up =null;

        List down =null;

        ChildrenChangedResult(List now, List last) {

            up =new LinkedList(now);

            up.removeAll(last);

            down =new LinkedList(last);

            down.removeAll(now);

        }

        publicList getUp() {

            return up;

        }

        publicList getDown() {

            return down;

        }

    }

}

依次启动3个ServerUnit,查看控制台:

依次关闭2个ServerUnit,查看控制台:

从关闭ServerUnit到控制台打印退出大概延迟8s左右,配置zk的tickTime=1000,比预期的要慢一些。

上一篇 下一篇

猜你喜欢

热点阅读