Zookeeper 客户端 Curator
Zookeeper 客户端 Curator
概述
Curator
是Netflix
公司开源的一套Zookeeper
客户端框架,解决了很多Zookeeper
客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher
和NodeExistsException
异常等等。Patrixck Hunt(Zookeeper)
以一句“Guava is to Java that Curator to Zookeeper
”给Curator予高度评价。
Curator包含了几个包:
-
curator-framework:对
Zookeeper
的底层API的一些封装。 - curator-client:提供一些客户端的操作,例如重试策略等。
- curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
客户端
创建
静态方法创建
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
connectionInfo,
5000,
3000,
retryPolicy);
Fluent API 创建
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
创建包含命名空间的客户端
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base") // 命名空间,即该客户端可操作目录的根目录
.build();
启动
client.start();
节点
Zookeeper
节点类型
- PERSISTENT: 持久化
- PERSISTENT_SEQUENTIAL: 持久化并且带序列号
- EPHEMERAL: 临时
- EPHEMERAL_SEQUENTIAL: 临时并且带序列号
创建
// 创建节点,内容为空
client.create().forPath("path");
// 创建节点,内容为“init”
client.create().forPath("path","init".getBytes());
// 创建临时节点,内容为空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
// 创建临时节点,内容为“init”
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
// 创建临时节点,内容为“init”,递归创建父节点
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
删除
// 删除节点
client.delete().forPath("path");
// 删除节点,递归删除子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
// 删除制定版本节点
client.delete().withVersion(10086).forPath("path");
// 强制删除节点
client.delete().guaranteed().forPath("path");
读取
// 读取节点内容
client.getData().forPath("path");
// 读取节点内容及状态
client.getData().storingStatIn(stat).forPath("path");
更新
// 更新节点
client.setData().forPath("path","data".getBytes());
// 指定版本更新节点
client.setData().withVersion(10086).forPath("path","data".getBytes());
节点是否存在
client.checkExists().forPath("path");
获取所有子节点
client.getChildren().forPath("path");
事务
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
异步接口
Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型 | 对应CuratorFramework实例的方法 |
---|---|
CREATE | create() |
DELETE | delete() |
EXISTS | checkExists() |
GET_DATA | getData() |
SET_DATA | setData() |
CHILDREN | getChildren() |
SYNC | sync(String,Object) |
GET_ACL | getACL() |
SET_ACL | setACL() |
WATCHED | Watcher(Watcher) |
CLOSING | close() |
响应码
响应码 | 意义 |
---|---|
0 | OK,即调用成功 |
-4 | ConnectionLoss,即客户端与服务端断开连接 |
-110 | NodeExists,即节点已经存在 |
-112 | SessionExpired,即会话过期 |
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor) //如果inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理
.forPath("path");
缓存
强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下的所有Api将会失效或者过期,尽管后面所有的例子都没有使用到ConnectionStateListener。
Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
Path Cache
Path Cache用来监控一个ZNode的子节点。当一个子节点增加,更新,删除时,Path Cache会改变它的状态, 会包含最新的子节点,子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。
实际使用时会涉及到四个类:
- PathChildrenCache
- PathChildrenCacheEvent
- PathChildrenCacheListener
- ChildData
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
想使用cache,必须调用它的start
方法,使用完后调用close
方法。 可以设置StartMode来实现启动的模式。
StartMode有下面几种:
- NORMAL:正常初始化。
- BUILD_INITIAL_CACHE:在调用
start()
之前会调用rebuild()
。 - POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
public void addListener(PathChildrenCacheListener listener) 可以增加listener监听缓存的变化。
getCurrentData()方法返回一个List<ChildData>
对象,可以遍历所有的子节点。
设置/更新、移除其实是使用client (CuratorFramework)来操作, 不通过PathChildrenCache操作:
public class PathCacheDemo {
private static final String PATH = "/example/pathCache";
public static void main(String[] args) throws Exception {
// 创建客户端
String connectInfo = "localhost:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
// 启动
client.start();
// cache
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
// cache start
cache.start();
// cache listener
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件类型:" + event.getType());
if (null != event.getData()) {
System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(cacheListener);
// test
client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());
ThreadUtil.sleep(10);
client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());
ThreadUtil.sleep(10);
client.setData().forPath(PATH + "/test01", "01_V2".getBytes());
ThreadUtil.sleep(10);
for (ChildData data : cache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath(PATH + "/test01");
ThreadUtil.sleep(10);
client.delete().forPath(PATH + "/test02");
ThreadUtil.sleep(10);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。
注意:示例中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,这可能与PathCache的实现原理有关,不能太过频繁的触发事件!
Node Cache
Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:
-
NodeCache
- Node Cache实现类 -
NodeCacheListener
- 节点监听器 -
ChildData
- 节点数据
注意:使用cache,依然要调用它的start()
方法,使用完后调用close()
方法。
getCurrentData()将得到节点当前的状态,通过它的状态可以得到当前的值。
public class NodeCacheDemo {
private static final String PATH = "/example/nodeCache";
public static void main(String[] args) throws Exception {
// 创建客户端
String connectInfo = "localhost:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
// 启动
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
// cache
NodeCache cache = new NodeCache(client, PATH);
// cache start
cache.start();
// cache listener
NodeCacheListener cacheListener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
} else {
System.out.println("节点被删除!");
}
};
cache.getListenable().addListener(cacheListener);
// test
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(10);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(10);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(10);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:示例中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,这可能与NodeCache的实现原理有关,不能太过频繁的触发事件!
注意:NodeCache只能监听一个节点的状态变化。
Tree Cache
Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合,主要涉及到下面四个类:
- TreeCache - Tree Cache实现类
- TreeCacheListener - 监听器类
- TreeCacheEvent - 触发的事件类
- ChildData - 节点数据
public class TreeCacheDemo {
private static final String PATH = "/example/cache";
public static void main(String[] args) throws Exception {
// 创建客户端
String connectInfo = "localhost:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
// 启动
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
// cache
TreeCache cache = new TreeCache(client, PATH);
// cache start
cache.start();
// cache listener
TreeCacheListener cacheListener = (client1, event) -> System.out.println("事件类型:" + event.getType() +
" | 路径:" + (null != event.getData() ? event.getData().getPath() : null) +
" | 数据:" + (null != event.getData() ? new String(event.getData().getData()) : null));
cache.getListenable().addListener(cacheListener);
// test
client.setData().forPath(PATH, "01".getBytes());
// Thread.sleep(10);
client.setData().forPath(PATH, "02".getBytes());
// Thread.sleep(10);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
// Thread.sleep(10);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:在此示例中没有使用Thread.sleep(10),但是事件触发次数也是正常的。
注意:TreeCache在初始化(调用start()
方法)的时候会回调TreeCacheListener
实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()
很有可能导致空指针异常,这里应该主动处理并避免这种情况。