互联网科技Java成长之路Java

zookeeper入门到实战

2019-06-12  本文已影响2人  码上搞定

一.zookeeper介绍

ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅负载均衡命名服务分布式协调/通知集群管理Master 选举配置维护名字服务分布式同步分布式锁和分布式队列等功能。

数据模型ZooKeeper 允许分布式进程通过共享的层次结构命名空间进行相互协调,这与标准文件系统类似。名称空间由 ZooKeeper 中的数据寄存器组成,称为 Znode,这些类似于文件和目录。与典型文件系统不同,ZooKeeper 数据保存在内存中,这意味着 ZooKeeper 可以实现高吞吐量和低延迟

顺序访问:对于来自客户端的每个更新请求,ZooKeeper 都会分配一个全局唯一的递增编号。这个编号反应了所有事务操作的先后顺序,应用程序可以使用 ZooKeeper 这个特性来实现更高层次的同步原语。这个编号也叫做时间戳zxidZooKeeper Transaction Id)。

可构建集群:为了保证高可用,最好是以集群形态来部署 ZooKeeper,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么 ZooKeeper 本身仍然是可用的。客户端在使用 ZooKeeper 时,需要知道集群机器列表,通过与集群中的某一台机器建立 TCP 连接来使用服务。客户端使用这个 TCP 链接来发送请求、获取结果、获取监听事件以及发送心跳包。如果这个连接异常断开了,客户端可以连接到另外的机器上。

image

工作原理

  1. Zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫做Zab协议
  2. Zab协议有两种模式,它们分别是恢复模式广播模式。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数server的完成了和leader状态同步以后,恢复模式就结束了。状态同步保证了leaderserver具有相同的系统状态。一旦leader已经和多数的follower进行了状态同步后,他就可以开始广播消息了,即进入广播状态。这时候当一个server加入zookeeper服务中,它会在恢复模式下启动,发现leader,并和leader进行状态同步。待到同步结束,它也参与消息广播。
  3. Zookeeper服务一直维持在Broadcast状态,直到leader崩溃了或者leader失去了大部分的followers支持

Leader选举

  1. 广播模式需要保证proposal提议)被按顺序处理leader来执行写操作),因此zk采用了递增的事务id号(zxid)来保证。所有的提议都在被提出的时候加上了zxid。实现中zxid是一个64为的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch。低32位是个递增计数。
  2. leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的server恢复到一个正确的状态。每个Server启动以后都询问其它的Server它要投票给谁。对于其他server的询问,server每次根据自己的状态都回复自己推荐的leaderid和上一次处理事务的zxid(系统启动时每个server都会推荐自己),收到所有Server回复以后,就计算出zxid最大的哪个Server,并将这个Server相关信息设置成下一次要投票的Server。计算这过程中获得票数最多的的sever为获胜者,如果获胜者的票数超过半数,则改server被选为leader。否则,继续这个过程,直到leader被选举出来

二.使用Zookeeper

 1        //客户端连接zookeeper服务器
 2         ZooKeeper zkClient = new ZooKeeper(CONNECT_STR, 50000, new Watcher() {
 3             @Override
 4             public void process(WatchedEvent watchedEvent) {
 5                 //监控服务节点变化
 6                 System.out.println("sssss");
 7             }
 8         });
 9         
10         //获取根节点下的所有节点
11         List<String> nodeList= zkClient.getChildren("/",null);
12 
13         System.out.println(nodeList.toString());
14 
15         //Stat isExists= zkClient.exists(LOCK_ROOT_PATH,null);
16         //在test父节点下创建子节点
17         String lockPath = zkClient.create("/test/why","why".getBytes(),            
18         ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

代码中需要注意的是如果父节点不存在,会报异常,同时父节点不能是临时节点。

Znode

  1. ZooKeeper中,“节点"分为两类,第一类同样是指构成集群的机器,我们称之为机器节点,第二类则是指数据模型中的数据单元,我们称之为数据节点一ZNode。
  2. ZooKeeper 将所有数据存储在内存中,数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个
    Znode,例如/foo/path1。每个上都会保存自己的数据内容,同时还会保存一系列属性信息。
  3. zookeeper有四类节点:PERSISTENT(持久的)EPHEMERAL(暂时的)PERSISTENT_SEQUENTIAL(持久化顺序编号目录节点)EPHEMERAL_SEQUENTIAL(暂时化顺序编号目录节点)

Session

  1. Session 指的是 ZooKeeper 服务器与客户端会话。在 ZooKeeper 中,一个客户端连接是指客户端和服务器之间的一个 TCP 长连接
  2. 客户端启动的时候,首先会与服务器建立一个 TCP连接,从第一次连接建立开始客户端会话的生命周期也开始了。通过这个连接,客户端能够通过心跳检测与服务器保持有效的会话,也能够向Zookeeper 服务器发送请求并接受响应,同时还能够通过该连接接收来自服务器的 Watch 事件通知
  3. SessionsessionTimeout值用来设置一个客户端会话的超时时间。当由于服务器压力太大、网络故障或是客户端主动断开连接等各种原因导致客户端连接断开时,只要在sessionTimeout规定的时间内能够重新连接上集群中任意一台服务器,那么之前创建的会话仍然有效。在为客户端创建会话之前,服务端首先会为每个客户端都分配一个sessionID。由于 sessionIDZookeeper 会话的一个重要标识,许多与会话相关的运行机制都是基于这个sessionID 的。因此,无论是哪台服务器为客户端分配的 sessionID,都务必保证全局唯一

Watcher:是 ZooKeeper 中的一个很重要的特性。ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

Version: Zookeeper 的每个 ZNode 上都会存储数据,对应于每个 ZNodeZookeeper 都会为其维护一个叫作 Stat 的数据结构。Stat 中记录了这个 ZNode 的三个数据版本,分别是:version(当前节点版本)、cversion当前节点的子节点版本)、aversion当前节点的ACL版本

ACLZooKeeper 采用 ACLAccessControlLists)策略来进行权限控制,类似于 UNIX 文件系统的权限控制。ZooKeeper 定义了 5 种权限:CREATE/READ/WRITE/DELETE/ADMIN


三.通过zookeeper实现分布式锁

   package com.why;
   
   import org.apache.zookeeper.*;
   import org.apache.zookeeper.data.Stat;
   
   import java.io.IOException;
   import java.util.Collections;
   import java.util.List;
   
  /*
  *  分布式锁
  * */
  public class DistributeLock {
  
      private static final String LOCK_ROOT_PATH = "/test";
      //private static final String LOCK_NODE_NAME = "Lock";
  
      private static ZooKeeper _zkClient;
  
      static {
          try {
              _zkClient = new ZooKeeper("192.168.6.132:2181", 500000, null);
          } catch (IOException e) {
              e.printStackTrace();
          }
      }
  
  
      public static String getLock() {
          try {
  
              //System.out.println(_zkClient.getChildren("/",false));
  
              String lockPath = _zkClient.create( "/test/why", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
              //System.out.println(lockPath);
              //System.out.println(_zkClient.getChildren(LOCK_ROOT_PATH,false));
              if (tryLock(lockPath))
                  return lockPath;
              else
                  return null;
          } catch (Exception ex) {
              ex.printStackTrace();
              return null;
          }
      }
  
      private static boolean tryLock(String lockPath) throws KeeperException, InterruptedException {
          List<String> lockPaths = _zkClient.getChildren(LOCK_ROOT_PATH, false);
          Collections.sort(lockPaths);
          int index=lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length()+1));
          if(index==0){
              //获得锁
              return true;
          }
          else{
              String preLockPath="/"+lockPaths.get(index-1);
  
              Watcher watcher=new Watcher() {
                  @Override
                  public void process(WatchedEvent watchedEvent) {
                      synchronized (this){
                          //唤醒线程
                          notifyAll();
                      }
                  }
              };
  
              Stat stat=_zkClient.exists(preLockPath,watcher);
  
              if(stat==null){
                  return tryLock(lockPath);
              }else{
                  synchronized (watcher){
                      watcher.wait();
                  }
                  return tryLock(lockPath);
              }
  
          }
  
      }
  
      public static void closeZkClient() throws InterruptedException {
          _zkClient.close();
      }
  
      public static void releaseLock(String lockPath) throws KeeperException, InterruptedException {
          _zkClient.delete(lockPath,-1);
      }
  }

测试:

    package com.why;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class MultiThreadDemo {
    
        private static  int counter = 0;
    
        public  static  void plus() throws InterruptedException {
            Thread.sleep(500);
            counter++;
            //System.out.println(counter);
        }
    
        public static int Count(){
            return counter;
        }
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
    
            ExecutorService executor= Executors.newCachedThreadPool();
            final int num=10;
            for(int i=0;i<num;i++){
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            String path = DistributeLock.getLock();
                            System.out.println(path);
                            plus();
                            DistributeLock.releaseLock(path);
                            System.out.println(Count());
                        } catch (InterruptedException | KeeperException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            executor.shutdown();
    
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读