ZooKeeper技术文大数据

2分钟了解[Apache]Curator

2016-07-13  本文已影响1346人  chesser

一句话告诉我Curator是什么?

ph-quote.png

(Zookeeper的背景就不在这里介绍了.)
Curator是对ZK的高阶封装. 与操作原生的Zookeeper相比, 它提供了对ZK的完美封装, 简化了对集群的连接, 错误的处理; 实现了一系列经典"模式", 比如分布式锁, Leader选举等.

既然是为了简单

首先, 获取一个连接实例(CuratorFramework). 对于每个集群, 只需要创建一个实例.

CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)

这里我们需要指定一个retryPolicy, 这里用一个指数补偿策略:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

然后我们就可以调用Zookeeper的一系列操作了, 比如创建:

client.create().forPath("/my/path", myData)

这里如果出现了连接问题,curator也会帮你自动重连. 所以, 很简单咯?

我们继续, C/R/U/D:


// this will create the given ZNode with the given data
client.create().forPath(path, payload);

// set data for the given node
client.setData().forPath(path, payload);

// set data for the given node asynchronously. The completion notification
// is done via the CuratorListener.
client.setData().inBackground().forPath(path, payload);

// delete the given node
client.delete().forPath(path);


public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception
{
  /**
   * Get children and set a watcher on the node. The watcher notification will come through the
   * CuratorListener (see setDataAsync() above).
   */
  return client.getChildren().watched().forPath(path);
}
        

模式

分布式锁

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

Leader选举

LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
    public void takeLeadership(CuratorFramework client) throws Exception
    {
        // this callback will get called when you are the leader
        // do whatever leader work you need to and only exit
        // this method when you want to relinquish leadership
    }
}

LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue();  // not required, but this is behavior that you will probably expect
selector.start();

事务

        Collection<CuratorTransactionResult>    results = client.inTransaction()
                .create().forPath("/a/path", "some data".getBytes())
            .and()
                .setData().forPath("/another/path", "other data".getBytes())
            .and()
                .delete().forPath("/yet/another/path")
            .and()
                .commit();  // IMPORTANT! The transaction is not submitted until commit() is called


附录

模块 简介
Framework 简化Zookeeper的使用,提供了一系列高阶API, 在Zookeeper集群连接的管理, 以及重连操作上化繁为简.
Receipts 实现了一些ZooKeeper的基本"模式", 该实现基于Curator Framework.
Utilities 一系列工具
Client ZooKeeper类的替代
Errors 定义了curator如何处理错误,连接问题,可恢复的异常等.
Extensions 实现了一些Zookeeper文档中提及的通用模式.

版本

上一篇 下一篇

猜你喜欢

热点阅读