5分钟学习zookeeper:Curator Recipes
2021-02-15 本文已影响0人
JerrysCode
Curator针对zookeeper的常见应用场景,提供了方便的Recipes(菜谱),帮助用户快速高效地完成代码开发。如:Leader选举,分布式锁,Barriers,分布式队列等。
zookeeper 数据模型是一个类似文件系统的树形结构,每一个节点叫做znode。各种使用场景本质都是对znode的增删改查。
Leader选举
分布式系统有多个节点,刚开始所有节点通过leader选举选定其中一个节点为leader。如果leader节点宕机则重新选举新的leader。
leader选举原理
多个客户端创建同名的临时znode,只有一个客户端可以创建成功。创建znode成功的客户端获得锁。释放锁时删除znode,其他客户端监听到变化就可以重新竞争锁。
Curator框架提供了LeaderLatch和LeaderSelector两种leader选举方式
LeaderLatch
接下来的代码模拟10个节点的leader选举情况。核心业务逻辑是:
- 创建LeaderLatch
new LeaderLatch(client, "/master", "node" + i, LeaderLatch.CloseMode.NOTIFY_LEADER)
- 给每个LeaderLatch添加监听器LeaderLatchListener,当被选举为leader时触发isLeader函数回调,当失去leader角色后会触发notLeader函数回调
for (int i = 0; i < 10; i++) {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.build();
client.start();
LeaderLatch latch = new LeaderLatch(client, "/master", "node" + i, LeaderLatch.CloseMode.NOTIFY_LEADER);
latch.addListener(new LeaderLatchListener() {
@Override //被选举为leader时,回调调用此方法
public void isLeader() {
System.out.println(latch.getId() + " follower -> leader, hasLeadership: " + latch.hasLeadership());
}
/*如果失去Leader则回调此方法,必须设置LeaderLatch.CloseMode.NOTIFY_LEADER才会触发,否则不触发
失去Leader的场景:自身close退出,和zk server的连接断开
*/
@Override
public void notLeader() {
System.out.println(latch.getId() + " lost leader: " + latch.hasLeadership());
}
});
latches.add(latch);
}
- 通过latch.start()开始竞选
for (LeaderLatch latch :latches){
new Thread(() -> {
try {
latch.start(); //开始竞选
System.out.println("latch start.." + latch.getId());
latch.await(); //等待直到被选举为Leader
System.out.println("latch await.." + latch.getId());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
LeaderSelector
接下来的代码模拟10个节点的leader选举。
- 创建LeaderSelector,创建时指定LeaderSelectorListener监听器。当被选举为leader或者leader节点发生变化时回调相应的方法。
public LeaderSelector(CuratorFramework client,String mutexPath,LeaderSelectorListener listener)
- 通过 selector.start()开始leader竞选。
List<LeaderSelector> selectors = new ArrayList<>();
List<CuratorFramework> clients = new ArrayList<>();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
for (int i = 0; i < 10; i++) {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.build();
client.start();
clients.add(client);
}
for (CuratorFramework client : clients){
LeaderSelector selector = new LeaderSelector(client, "/master", new LeaderSelectorListenerAdapter() {
@Override //获取Leader权限后的处理逻辑,处理完成后自动释放Leader权限
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(client.toString() + " hasLeadership" );
Thread.sleep(2000);
}
@Override //节点状态变化
public void stateChanged(CuratorFramework client, ConnectionState state) {
System.out.println(client.toString() + " " + state.name());
}
});
selector.autoRequeue(); //释放Leader权限后,继续竞选
selector.start();
selectors.add(selector);
}
分布式锁
分布式锁保证多个节点之间的操作可以同步,即任意时间点只有一个节点持有锁。
分布式锁实现原理
- zookeeper首先创建一个/lock节点
- 当有节点获取锁是,先为这个节点创建临时节点,例如lock-702564158761685-000001,序列号按创建顺序递增。
- zookeeper会检查 lock-702564158761685-000001 是否/lock下的最小节点,如果是该节点得到锁,否则监听 lock-702564158761685-000001 前一个节点状态
-
当前一个节点的状态发生变化时回到步骤1,继续竞争锁。
image.png
分布式锁代码实现
- 创建分布式锁
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT)
.sessionTimeoutMs(SESSION_TIMEOUT)
.build();
client.start();
InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(client, "/lock");
mutex = new InterProcessSemaphoreMutex(clnt, path)
- 通过 mutex.acquire()获取锁
mutex.acquire();
- 通过mutex.release()释放锁
mutex.release();
Curator Recipes封装了底层细节,让用户可以更快捷地实现各种分布式应用场景。除了本文列出的leader选举和分布式锁,还有分布式队列,节点缓存,Barrier等。可以参考官方文档http://curator.apache.org/curator-recipes/index.html上的api。