3、zk客户端curator使用(转)

2019-05-02  本文已影响0人  小manong

一、curator简介

curator-framework:对zookeeper的底层api的一些封装
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

二、Curator-framework基本api使用

1、创建会话

(1)静态工厂方法创建会话

String connectionInfo = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
                .newClient(connectionInfo, 5000, 3000, retryPolicy);

newClient静态工厂方法包含四个主要参数:


静态工厂参数

(2)使用fluent流式创建

  RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

(3)创建包含命名空间的会话

 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                #创建命名空间为namespace的会话(该会话内的操作都是基于该目录进行的)
                .namespace("namespace")
                .build();
2、启动客户端
client.start();
3、创建数据节点

PERSISTENT:持久化
PERSISTENT_SEQUENTIAL:持久化并且带序列号
EPHEMERAL:临时
EPHEMERAL_SEQUENTIAL:临时并且带序列号

//创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());
4、删除节点
//删除一个节点,并且递归删除其所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
//删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
//删除一个节点,强制保证删除,guaranteed()接口是一个保障措施,只要客户端会话有效,
//那么Curator会在后台持续进行删除操作,直到删除节点成功。
client.delete().guaranteed().forPath("path");

5、读取数据
//读取一个节点的数据内容,同时获取到该节点的stat
读取一个节点的数据内容,同时获取到该节点的stat
6、更新节点数据
//更新一个节点的数据内容,该接口会返回一个Stat实例
 Stat path = client.setData().forPath("path", "data".getBytes());
//更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
7、检测节点是否存在
 Stat stat = client.checkExists().forPath("path");
8、获取某个节点的所有子节点路径
//该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处
//理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
client.getChildren().forPath("path");
9、事物
client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();
10、异步接口
//异步的创建节点
   Executor executor = Executors.newFixedThreadPool(2);
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .inBackground((curatorFramework, curatorEvent) -> {
                    System.out.println(String.format("eventType:%s,resultCode:%s", 
                            curatorEvent.getType(), curatorEvent.getResultCode()));
                }, executor)
                .forPath("path");

二、Curator-recipes实现高级特征

1、缓存cache

(1)Path Cache

PathChildrenCache、PathChildrenCacheEvent、PathChildrenCacheListener、ChildData

//1、构造函数
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
//2、想使用cache,必须调用它的start方法,使用完后调用close方法。可以设置StartMode来实现启动的模式,NORMAL:正常初始化。BUILD_INITIAL_CACHE:在调用start()之前会调用rebuild()。POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
//3、可以增加listener监听缓存的变化。
public void addListener(PathChildrenCacheListener listener)
//4、遍历所有的子节点
getCurrentData()

 private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        //1.创建client并启动
        String connectionInfo = "127.0.0.1:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(connectionInfo, 5000, 3000, retryPolicy);
        client.start();
        //2.创建path cache并启动
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        //3、进行事件监听
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            //如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的
            // event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        //4、进行响应的curd操作,看看事件回调结果
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }

(2)Node Cache

NodeCache - Node Cache实现类
NodeCacheListener - 节点监听器
ChildData - 节点数据

 client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");

(3)Tree Cache

TreeCache - Tree Cache实现类
TreeCacheListener - 监听器类
TreeCacheEvent - 触发的事件类
ChildData - 节点数据

 client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
2、leader选举
public class LeaderLatchTest {
    /**
     * 1、一旦启动,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后其中一个最终会被选举为leader,
     * 2、可以通过hasLeadership方法查看LeaderLatch实例是否leader:
     * 3、 LeaderLatch在请求成为leadership会block(阻塞),一旦不使用LeaderLatch了,必须调用close方法。
     * 如果它是leader,会释放leadership, 其它的参与者将会选举一个leader。
     * 4、 错误处理:LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时,
     * leader不再认为自己还是leader。当LOST后连接重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后重新
     * 创建一个。LeaderLatch用户必须考虑导致leadership丢失的连接问题。
     * 强烈推荐你使用ConnectionStateListener
     */

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;

    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        String connectionInfo = "127.0.0.1:2181";
        try {
            //1、先创建10个leaderLatch
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                //2、启动后,选中一个作为leader
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            //3、通过hasLeadership查看自己是否是leader, 如果是的话返回true。
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            //4、close释放当前的领导权。
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState() && !latch.getState().equals(LeaderLatch.State.CLOSED))
                    latch.close();
            }
            for (CuratorFramework client : clients) {
                client.close();
            }
        }
    }
}

(2)LeaderSelector

LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    //使用AtomicInteger来记录此client获得领导权的次数, 它是”fair”, 每个client有平等的机会获得领导权。
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        //调用 leaderSelector.autoRequeue();保证在此实例释放领导权之后还可能获得领导权。
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    /**
     * leaderSelector.start(); 一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用。
     * 而takeLeadership()方法只有领导权被释放时才返回。
     * 你可以在takeLeadership进行任务的分配等等,并且不要返回,
     * 如果你想要要此实例一直是leader的话可以加一个死循环。
     * @param client
     * @throws Exception
     */
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}
public class LeaderSelectorDemo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        String connectionInfo = "127.0.0.1:2181";
        try {
            //1、构建10个LeaderSelectorAdapter
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                //2、启动,并开始选举
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

(3)小结
对比可知,LeaderLatch必须调用close()方法才会释放领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。从而,LeaderSelector具有更好的灵活性和可控性,建议有LeaderElection应用场景下优先使用LeaderSelector。

3、分布式锁

1.推荐使用ConnectionStateListener监控连接的状态,因为当连接LOST时你不再拥有锁
2.分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。
(1)可重入共享锁Shared Reentrant Lock

public InterProcessMutex(CuratorFramework client, String path)
public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}
public class InterProcessMutexDemo {
    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        //加锁、其他被阻塞
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            //解锁
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        String connectionInfo = "127.0.0.1:2181";
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {

        }
    }
}

(2)不可重入共享锁—Shared Lock

(3)可重入读写锁—Shared Reentrant Read Write Lock

4、分布式计数器

(1)分布式int计数器—SharedCount

SharedCount
SharedCountReader
SharedCountListener

get(): 获取当前值
increment(): 加一
decrement(): 减一
add(): 增加特定的值
subtract(): 减去特定的值
trySet(): 尝试设置计数值
forceSet(): 强制设置计数值

5、分布式队列
6、分布式屏障—Barrier

转载:https://my.oschina.net/woter/blog/1933298

上一篇下一篇

猜你喜欢

热点阅读