4、Zookeeper的客户端实例

2017-05-19  本文已影响91人  yannhuang

Zookeeper的常用开源客户端有ZkClient 和 Curator,网上可以找到很多关于这两个客户端的资料。本文将要讲述的不是这两个客户端,而是一个更加轻量级的Zookeeper客户端:de-zookeeper

完整代码已经提交到github上:

https://github.com/huangyanxiong/de-framework/tree/master/de-zookeeper

下面将详细讲述de-zookeeper的代码细节和使用方法。

1、ZookeeperProperty.java

该类用于封装 Zookeeper 的连接属性,包括IP和端口、路径、超时时间三个属性,并提供带参构造方法和对应的 get/set 方法:

package com.dataeye.zookeeper;

public class ZookeeperProperty {
  private String zkConnnectionStr;   // 连接的IP和端口,比如: localhost:2181
  private String zNodePath;   //  zk路径,比如:/data/work
  private int sessionTimeout;  //  超时时间,毫秒,比如  60000

  public ZookeeperProperty(String zkConnnectionStr, String zNodePath, int sessionTimeout) {
    this.zkConnnectionStr = zkConnnectionStr;
    this.zNodePath = zNodePath;
    this.sessionTimeout = sessionTimeout;
  }

  public String getZkConnnectionStr() {
    return zkConnnectionStr;
  }

  public void setZkConnnectionStr(String zkConnnectionStr) {
    this.zkConnnectionStr = zkConnnectionStr;
  }

  public String getzNodePath() {
    return zNodePath;
  }

  public void setzNodePath(String zNodePath) {
    this.zNodePath = zNodePath;
  }

  public int getSessionTimeout() {
    return sessionTimeout;
  }

  public void setSessionTimeout(int sessionTimeout) {
    this.sessionTimeout = sessionTimeout;
  }
}

2、ZooKeeperListener.java

这是描述客户端行为的接口。这里Zookeeper监听三种行为:

connected:建立连接
nodeValueChange:节点的值发生改变
nodeChildChange:子节点发生变化

package com.dataeye.zookeeper;

import java.util.Map;

public interface ZooKeeperListener {

  void nodeChildChange(Map<String, String> newChildrenValue);

  void nodeValueChange(String newValue);

  void connected();
}

3、NodeValueCodec.java

节点的编码和解码类,提供 decode 和 encode 接口

package com.dataeye.zookeeper;

import java.nio.charset.StandardCharsets;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public interface NodeValueCodec<T> {

  NodeValueCodec DEFAULT = DefaultNodeValueCodec.INSTANCE;

  default Set<T> decodeAll(byte[] zNodeValue) {
    requireNonNull(zNodeValue, "zNodeValue");
    return decodeAll(new String(zNodeValue, StandardCharsets.UTF_8));
  }

  Set<T> decodeAll(String zNodeValue);

  default T decode(byte[] zNodeValue) {
    requireNonNull(zNodeValue, "zNodeValue");
    return decode(new String(zNodeValue, StandardCharsets.UTF_8));
  }

  T decode(String zNodeValue);

  byte[] encodeAll(Iterable<T> entries);

  byte[] encode(T entry);
}

4、DefaultNodeValueCodec.java

NodeValueCodec接口的实现类,具体实现了编码和解码的过程。

这里的编码过程比较简单,只是把Worker类的两个属性: id 和 biz使用冒号分割开来,返回类似于 id:biz 这样的字符串;解码过程则相反。

package com.dataeye.zookeeper;

import com.dataeye.crawler.thrift.Worker;

import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;

import static java.util.Objects.requireNonNull;

public final class DefaultNodeValueCodec implements NodeValueCodec<Worker> {
  public static final DefaultNodeValueCodec INSTANCE = new DefaultNodeValueCodec();
  private static final String segmentDelimiter = ",";
  private static final String fieldDelimiter = ":";
  private static final Pattern SEGMENT_DELIMITER = Pattern.compile("\\s*" + segmentDelimiter + "\\s*");

  @Override
  public Worker decode(String segment) {
    final String[] tokens = segment.split(fieldDelimiter);

    String workerId = tokens[0];
    String biz = tokens[1];
    final Worker worker = new Worker(workerId, biz);

    return worker;
  }

  @Override
  public Set<Worker> decodeAll(String valueString) {
    Set<Worker> workers = new HashSet<>();
    try {
      for (String segment : SEGMENT_DELIMITER.split(valueString)) {
        workers.add(decode(segment));
      }
    } catch (Exception e) {
      throw new RuntimeException("invalid worker list: " + valueString, e);
    }
    if (workers.isEmpty()) {
      throw new RuntimeException("ZNode does not contain any workers.");
    }
    return workers;
  }

  @Override
  public byte[] encodeAll(Iterable<Worker> workers) {
    requireNonNull(workers, "workers");
    StringBuilder nodeValue = new StringBuilder();
    workers.forEach(worker -> nodeValue.append(worker.getId()).append(fieldDelimiter).append(
            worker.getBiz()).append(segmentDelimiter));
    //delete the last unused segment delimiter
    if (nodeValue.length() > 0) {
      nodeValue.deleteCharAt(nodeValue.length() - 1);
    }
    return nodeValue.toString().getBytes(StandardCharsets.UTF_8);
  }

  @Override
  public byte[] encode(Worker worker) {
    return (worker.getId() + fieldDelimiter + worker.getBiz()).getBytes(StandardCharsets.UTF_8);
  }
}

5、zk.thrift

该客户端使用thrift生成序列化对象:

namespace java com.dataeye.crawler.thrift

struct Worker {
    //worker的标识
    1: required string id,
    //worker所属的业务
    2: required string biz
}

6、ZooKeeperConnector.java

该类负责ZooKeeper的连接,节点操作,节点监听等功能,是该客户端的核心类

package com.dataeye.zookeeper;

import com.google.common.annotations.VisibleForTesting;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import static java.util.Objects.requireNonNull;
import static org.apache.zookeeper.KeeperException.Code.get;

public final class ZooKeeperConnector {
  private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConnector.class);
  private final String zkConnectionStr;
  private final String zNodePath;
  private final int sessionTimeout;
  private final ZooKeeperListener listener;
  private ZooKeeper zooKeeper;
  private BlockingQueue<KeeperState> stateQueue;
  private CountDownLatch latch;
  private boolean activeClose;
  private String prevNodeValue;
  private Map<String, String> prevChildValue;

  public ZooKeeperConnector(String zkConnectionStr, String zNodePath, int sessionTimeout,
                            ZooKeeperListener listener
  ) {
    this.zkConnectionStr = requireNonNull(zkConnectionStr, "zkConnectionStr");
    this.zNodePath = requireNonNull(zNodePath, "zNodePath");
    this.sessionTimeout = sessionTimeout;
    this.listener = requireNonNull(listener, "listener");
  }

  /**
   * Do connect.
   */
  public void connect() {
    try {
      activeClose = false;
      latch = new CountDownLatch(1);
      zooKeeper = new ZooKeeper(zkConnectionStr, sessionTimeout, new ZkWatcher());
      latch.await();
      notifyConnected();
      notifyChange();
      if (stateQueue != null) {
        //put a fake stat to ensure method postConnected finished completely
        //(so that it won't throw exception under ZooKeeper connection recovery test)
        stateQueue.put(KeeperState.Disconnected);
      }
    } catch (Exception e) {
      throw new ZooKeeperException(
              "failed to connect to ZooKeeper:  " + zkConnectionStr + " (" + e + ')', e);
    }
  }

  /**
   * Utility method to create a node.
   * @param nodePath node name
   * @param value    node value
   */
  public void createChild(String nodePath, byte[] value, CreateMode createMode) {
    // CreateMode.EPHEMERAL 临时节点
    // CreateMode.PERSISTENT 持久化节点
    try {
      //first check the parent node
      if (zooKeeper.exists(zNodePath, false) == null) {
        //parent node not exist, create it
        zooKeeper.create(zNodePath, zNodePath.getBytes(StandardCharsets.UTF_8),
                Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
      }
      if (zooKeeper.exists(zNodePath + '/' + nodePath, true) == null) {
        zooKeeper.create(zNodePath + '/' + nodePath, value, Ids.OPEN_ACL_UNSAFE, createMode);
      } else {
        throw new ZooKeeperException("failed to create ZooKeeper Node:" + zNodePath + '/' + nodePath +
                ",because the path exist already.");
      }
    } catch (Exception e) {
      throw new ZooKeeperException(
              "failed to create ZooKeeper Node:  " + zkConnectionStr + " (" + e + ')', e);
    }
  }

  /**
   * Closes the underlying Zookeeper connection.
   * @param active if it is closed by user actively ? or passively by program because of underlying
   *               connection expires
   */
  public void close(boolean active) {
    try {
      activeClose = active;
      zooKeeper.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  /**
   * Notify listener that ZooKeeper connection has been established.
   */
  private void notifyConnected() {
    listener.connected();
  }

  /**
   * Notify listener that a node value or node children has changed.
   */
  private void notifyChange() {
    //forget it if event was triggered by user's actively closing EndpointGroup
    if (activeClose) {
      return;
    }
    List<String> children;
    byte[] newValueBytes;
    try {
      if (zooKeeper.exists(zNodePath, true) == null) {
        return;
      }
      children = zooKeeper.getChildren(zNodePath, true);
      newValueBytes = zooKeeper.getData(zNodePath, false, null);
      if (newValueBytes != null) {
        String newValue = new String(newValueBytes, StandardCharsets.UTF_8);
        if (prevNodeValue == null || !prevNodeValue.equals(newValue)) {
          listener.nodeValueChange(newValue);
          prevNodeValue = newValue;
        }
      }
      //check children status
      if (children != null) {
        Map<String, String> newChildValue = new HashMap<>();
        children.forEach(child -> {
          try {
            newChildValue.put(child,
                    new String(zooKeeper.getData(zNodePath + '/' + child,
                            false, null), StandardCharsets.UTF_8));
          } catch (Exception e) {
            throw new ZooKeeperException(e);
          }
        });
        if (prevChildValue == null || !prevChildValue.equals(newChildValue)) {
          listener.nodeChildChange(newChildValue);
          prevChildValue = newChildValue;
        }
      }
    } catch (Exception ex) {
      throw new ZooKeeperException("Failed to notify ZooKeeper listener", ex);
    }
  }

  /**
   * A ZooKeeper watch.
   */
  final class ZkWatcher implements Watcher, StatCallback {
    @Override
    public void process(WatchedEvent event) {
      if (stateQueue != null) {
        enqueueState(event.getState());
      }
      String path = event.getPath();
      if (event.getType() == Event.EventType.None) {
        // Connection state has been changed. Keep retrying until the connection is recovered.
        switch (event.getState()) {
          case Disconnected:
            break;
          case SyncConnected:
            // We are here because of one of the following:
            // - initial connection,
            // - reconnection due to session timeout or
            // - reconnection due to session expiration
            // Once connected, reset the retry delay.
            latch.countDown();
            break;
          case Expired:
            // Session expired usually happens when a client reconnected to the server after
            // long time network partition, exceeding the configured
            // session timeout. We need to reconstruct the ZooKeeper client.
            // First, clean the original handle.
            close(false);
            zooKeeper = null;
            try {
              if (!activeClose) {
                connect();
              }
            } catch (ZooKeeperException e) {
              logger.warn("Failed to attempt to recover a ZooKeeper connection", e);
            }
            break;
        }
      } else {
        if (path != null && path.startsWith(zNodePath)) {
          // Something has changed on the node, let's find out.
          try {
            zooKeeper.exists(path, true, this, null);
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    }

    @Override
    public void processResult(int responseCodeInt, String path, Object ctx, Stat stat) {
      Code responseCode = get(responseCodeInt);
      switch (responseCode) {
        case OK:
          break;
        case NONODE:
          break;
        case SESSIONEXPIRED:
          // Ignore this and let the zNode Watcher process it first.
        case NOAUTH:
          // It's possible that this happens during runtime. We ignore this and wait for the ACL
          // configuration returns to normal. If it happens when the ZooKeeper client is initially
          // constructed, the constructor will throw an exception.
          return;
        default:
          // Retry on recoverable errors. Fatal errors go to the process() method above.
          try {
            zooKeeper.exists(path, true, this, null);
          } catch (Exception ex) {
            throw new ZooKeeperException("Failed to process ZooKeeper callback event", ex);
          }
          return;
      }
      if (!activeClose) {
        notifyChange();
        //enqueue an end flag to force the main thread to wait until this callback finished  before exit
        if (stateQueue != null) {
          enqueueState(KeeperState.Disconnected);
        }
      }
    }

    /**
     * Enqueue the state.
     * @param state ZooKeeper state
     */
    private void enqueueState(KeeperState state) {
      if (stateQueue == null) {
        return;
      }
      try {
        stateQueue.put(state);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

7、ZKClient.java

Zookeeper客户端

package com.dataeye.zookeeper.client;

import com.dataeye.crawler.thrift.Worker;
import com.dataeye.zookeeper.DefaultNodeValueCodec;
import com.dataeye.zookeeper.ZooKeeperConnector;
import com.dataeye.zookeeper.ZooKeeperListener;
import com.dataeye.zookeeper.ZookeeperProperty;

import java.util.*;
import java.util.stream.Collectors;


public class ZKClient {

    public static void main(String[] args) {
        String zkConn = "localhost:2181";

        DefaultNodeValueCodec nodeValueCodec = DefaultNodeValueCodec.INSTANCE;

        ZookeeperProperty zookeeperProperty = new ZookeeperProperty(zkConn, "/de-spider/works", 60000);

        ZooKeeperConnector zooKeeperConnector = new ZooKeeperConnector(
                zookeeperProperty.getZkConnnectionStr(),
                zookeeperProperty.getzNodePath(),
                zookeeperProperty.getSessionTimeout(),
                new ZooKeeperListener() {
                    @Override
                    public void nodeChildChange(Map<String, String> newChildrenValue) {
                        List<Worker> newWorkers = newChildrenValue.values().stream().map(nodeValueCodec::decode).filter(Objects::nonNull).collect(Collectors.toList());
                        Map<String, Set<Worker>> newBizWorkers = new HashMap<>();

                        if (newWorkers != null) {
                            for (Worker worker : newWorkers) {
                                String biz = worker.biz;

                                if (!newBizWorkers.containsKey(biz)) {
                                    newBizWorkers.put(biz, new HashSet<>());
                                }
                                newBizWorkers.get(biz).add(worker);
                            }
                        }
                    }

                    @Override
                    public void nodeValueChange(String newValue) {

                    }

                    @Override
                    public void connected() {

                    }
                });
        zooKeeperConnector.connect();
    }
}

上一篇下一篇

猜你喜欢

热点阅读