Zookeeper开发(03)获取管理权
现在我们有了会话,我们的Master程序需要获得管理权,虽然现在我们只有⼀个主节点,但我们还是要⼩⼼仔细。我们需要运⾏多个进程,以便在活动主节点发⽣故障后,可以有进程接替主节点。
为了确保同⼀时间只有⼀个主节点进程出于活动状态,我们使⽤ZooKeeper来实现简单的群⾸选举算法。这个算法中,所有潜在的主节点进程尝试创建/master节点,但只有⼀个成功,这个成功的进程成为主节点。
常量ZooDefs.Ids.OPEN_ACL_UNSAFE为所有⼈提供了所有权限(正如其名所显⽰的,这个ACL策略在不可信的环境下使⽤是⾮常不安全的)。
ZooKeeper通过插件式的认证⽅法提供了每个节点的ACL策略功能,因此,如果我们需要,就可以限制某个⽤户对某个znode节点的哪些权限,但对于这个简单的例⼦,我们继续使⽤OPEN_ACL_UNSAFE策略。当然,我们希望在主节点死掉后/master节点会消失。正如我们前面所提到的持久性和临时性znode节点,我们可以使⽤ZooKeeper的临时性znode节点来达到我们的⽬的。我们将定义⼀个EPHEMERAL的znode节点,当创建它的会话关闭或⽆效时,ZooKeeper会⾃动检测到,并删除这个节点。
因此,我们将会在我们的程序中添加以下代码:
![](https://img.haomeiwen.com/i3673891/578a86944d32cd09.png)
①我们试着创建znode节点/master。如果这个znode节点存在,create就会失败。同时我们想在/master节点的数据字段保存对应这个服务器的唯⼀ID。
②数据字段只能存储字节数组类型的数据,所以我们将int型转换为⼀个字节数组。
③如之前所提到的,我们使用开放的ACL策略。
④我们创建的节点类型为EPHEMERAL。
然⽽,我们这样做还不够,create⽅法会抛出两种异常:KeeperException和InterruptedException。我们需要确保我们处理了这两种异常,特别是ConnectionLossException(KeeperException异常的⼦类)和InterruptedException。对于其他异常,我们可以忽略并继续执⾏,但对于这两种异常,create⽅法可能已经成功了,所以如果我们作为主节点就需要捕获并处理它们。
ConnectionLossException异常发⽣于客户端与ZooKeeper服务端失去连接时。⼀般常常由于⽹络原因导致,如⽹络分区或ZooKeeper服务器故障。当这个异常发⽣时,客户端并不知道是在ZooKeeper服务器处理前丢失了请求消息,还是在处理后客户端未收到响应消息。如我们之前所描述的,ZooKeeper的客户端库将会为后续请求重新建⽴连接,但进程必须知道⼀个未决请求是否已经处理了还是需要再次发送请求。
InterruptedException异常源于客户端线程调⽤了Thread.interrupt,通常这是因为应⽤程序部分关闭,但还在被其他相关应⽤的⽅法使⽤。从字⾯来看这个异常,进程会中断本地客户端的请求处理的过程,并使该请求处于未知状态。
这两种请求都会导致正常请求处理过程的中断,开发者不能假设处理过程中的请求的状态。当我们处理这些异常时,开发者在处理前必须知道系统的状态。如果发⽣群⾸选举,在我们没有确认情况之前,我们不希望确定主节点。如果create执⾏成功了,活动主节点死掉以前,没有任何进程能够成为主节点,如果活动主节点还不知道⾃⼰已经获得了管理权,不会有任何进程成为主节点进程。
当处理ConnectionLossException异常时,我们需要找出那个进程创建的/master节点,如果进程是⾃⼰,就开始成为群⾸⾓⾊。我们通过getData⽅法来处理:
![](https://img.haomeiwen.com/i3673891/8a42e7891c28958a.png)
其中:
path
类似其他ZooKeeper⽅法⼀样,第⼀个参数为我们想要获取数据的znode节点路径。
watch
表⽰我们是否想要监听后续的数据变更。如果设置为true,我们就可以通过我们创建ZooKeeper句柄时所设置的Watcher对象得到事件,同时另⼀个版本的⽅法提供了以Watcher对象为⼊参,通过这个传⼊的对象来接收变更的事件。我们在后续章节再讨论如何监视变更情况,现在我们设置这个参数为false,因为我们现在我们只想知道当前的数据是什么。
stat
最后⼀个参数类型Stat结构,getData⽅法会填充znode节点的元数据信息。返回值⽅法返回成功(没有抛出异常),就会得到znode节点数据的字节数组。
让我们按以下代码段来修改代码,在runForMaster⽅法中引⼊异常处理:
![](https://img.haomeiwen.com/i3673891/5e0d01bf7eec376e.png)
①通过获取/master节点的数据来检查活动主节点。
②该⾏展示了为什么我们需要使用在创建/master节点时保存的数据:如果/master存在,我们使用/master中的数据来确定谁是群首。如果⼀个进程捕获到ConnectionLossException,这个进程可能就是主节点,因create操作实际上已经处理完,但响应消息却丢失了。
③我们将InterruptedException异常简单地传递给调用者。
④我们将zk.create⽅法包在try块之中,以便我们捕获并处理ConnectionLossException异常。
⑤这里为create请求,如果成功执⾏将会成为主节点。
⑥处理ConnectionLossException异常的catch块的代码为空,因为我们并不想中⽌函数,这样就可以使处理过程继续向下执⾏。
⑦检查活动主节点是否存在,如果不存在就重试。
在这个例⼦中,我们简单地传递InterruptedException给调⽤者,即向上传递异常。不过,在Java中没有明确的指导⽅针告诉我们如何处理线程中断,甚⾄没有告诉我们这个中断代表什么。有些时候,中断⽤于通知线程现在要退出了,需要进⾏清理操作,另外的情况,中断⽤于获得⼀个线程的控制权,应⽤的执⾏还将继续。
InterruptedException异常的处理依赖于程序的上下⽂环境,如果向上抛出InterruptedException异常,最终关闭zk句柄,我们可以抛出异常到调⽤栈顶,当句柄关闭时就可以清理所有⼀切。如果zk句柄未关闭,在重新抛出异常前,我们需要弄清楚⾃⼰是不是主节点,或者继续异步执⾏后续操作。后者情况⾮常棘⼿,需要我们仔细设计并妥善处理。
现在,我们看⼀下Master的main主函数:
![](https://img.haomeiwen.com/i3673891/45ffff927865285c.png)
①调用我们之前实现的runForMaster函数,当前进程成为主节点或另⼀进程成为主节点后返回。
②当我们开发主节点的应用逻辑时,我们在此处开始执⾏这些逻辑,现在我们仅仅输出我们成为主节点的信息,然后等待60秒后退出main函数。
异步获取管理权
ZooKeeper中,所有同步调⽤⽅法都有对应的异步调⽤⽅法。通过异步调⽤,我们可以在单线程中同时进⾏多个调⽤,同时也可以简化我们的实现⽅式。让我们回顾管理权的例⼦,修改为异步调⽤的⽅式。
以下为create⽅法的异步调⽤版本:
![](https://img.haomeiwen.com/i3673891/a82c3e292a920314.png)
create⽅法的异步⽅法与同步⽅法⾮常相似,仅仅多了两个参数:
①提供回调⽅法的对象。
②用户指定上下⽂信息(回调⽅法调用是传⼊的对象实例)。
该⽅法调⽤后通常在create请求发送到服务端之前就会⽴即返回。回调对象通过传⼊的上下⽂参数来获取数据,当从服务器接收到create请求的结果时,上下⽂参数就会通过回调对象提供给应⽤程序。
注意,该create⽅法不会抛出异常,我们可以简化处理,因为调⽤返回前并不会等待create命令完成,所以我们⽆需关⼼InterruptedException异常;同时因请求的所有错误信息通过回调对象会第⼀个返回,所以我们也⽆需关⼼KeeperException异常。
回调对象实现只有⼀个⽅法的StringCallback接口:
![](https://img.haomeiwen.com/i3673891/4184003f775c5352.png)
异步⽅法调⽤会简单化队列对ZooKeeper服务器的请求,并在另⼀个线程中传输请求。当接收到响应信息,这些请求就会在⼀个专⽤回调线程中被处理。为了保持顺序,只会有⼀个单独的线程按照接收顺序处理响应包。
processResult各个参数的含义如下:
rc
返回调⽤的结构,返回OK或与KeeperException异常对应的编码值。
path
我们传给create的path参数值。
ctx
我们传给create的上下⽂参数。
name
创建的znode节点名称。
⽬前,调⽤成功后,path和name的值⼀样,但是,如果采⽤CreateMode.SEQUENTIAL模式,这两个参数值就不会相等。
注意:回调函数处理,因为只有⼀个单独的线程处理所有回调调用,如果回调函数阻塞,所有后续回调调用都会被阻塞,也就是说,⼀般不要在回调函数中集中操作或阻塞操作。有时,在回调函数中调用同步⽅法是合法的,但⼀般还是避免这样做,以便后续回调调用可以快速被处理。
让我们继续完成我们的主节点的功能,我们创建了masterCreateCallback对象,⽤于接收create命令的结果:
![](https://img.haomeiwen.com/i3673891/aecaeac8eaad95e0.png)
①我们从rc参数中获得create请求的结果,并将其转换为Code枚举类型。rc如果不为0,则对应KeeperException异常。
②如果因连接丢失导致create请求失败,我们会得到CONNECTIONLOSS编码的结果,⽽不是ConnectionLossException异常。当连接丢失时,我们需要检查系统当前的状态,并判断我们需要如何恢复,我们将会在我们后面实现的checkMaster⽅法中进⾏处理。
③我们现在成为群首,我们先简单地赋值isLeader为true。
④其他情况,我们并未成为群首。
⑤在runForMaster⽅法中,我们将masterCreateCallback传给create⽅法,传⼊null作为上下⽂对象参数,因为在runForMaster⽅法中,我们现在不需要向masterCreateCallback.processResult⽅法传⼊任何信息。
我们现在需要实现checkMaster⽅法,这个⽅法与之前的同步情况不太⼀样,我们通过回调⽅法实现处理逻辑,因此在checkMaster函数中不会看到⼀系列的事件,⽽只有getData⽅法。getData调⽤完成后,后续处理将会在DataCallback对象中继续:
![](https://img.haomeiwen.com/i3673891/3091ca53d85c9eac.png)
同步⽅法和异步⽅法的处理逻辑是⼀样的,只是异步⽅法中,我们没有使⽤while循环,⽽是通过异步操作在回调函数中进⾏错误处理。此时,同步的版本看起来⽐异步版本实现起来更简单,但在后面我们会看到,应⽤程序常常由异步变化通知所驱动,因此最终以异步⽅式构建系统,反⽽使代码更简单。同时,异步调⽤不会阻塞应⽤程序,这样其他事务可以继续进⾏,甚⾄是提交新的ZooKeeper操作。
设置元数据
我们将使⽤异步API⽅法来设置元数据路径。我们的主从模型设计依赖三个⽬录:/tasks、/assign和/workers,我们可以在系统启动前通过某些系统配置来创建所有⽬录,或者通过在主节点程序每次启动时都创建这些⽬录。以下代码段会创建这些路径,例⼦中除了连接丢失错误的处理外没有其他错误处理:
![](https://img.haomeiwen.com/i3673891/541ed1dd4d3d19c7.png)
①我们没有数据存⼊这些znode节点,所以只传⼊空的字节数组。
②因为如此,我们不用关⼼去跟踪每个znode节点对应的数据,但是往往每个路径都具有独特的数据,所以我们通过回调上下⽂参数对create操作进⾏跟踪数据。在create函数的第⼆个和第四个参数均传⼊的data对象,也许看起来有些奇怪,但第⼆个参数传⼊的data表示要保存到znode节点的数据,⽽第四个参数传⼊的data,我们可以在createParentCallback回调函数中继续使用。
③如果回调函数中得到CONNECTIONLOSS返回码,我们通过调用createPath⽅法来对create操作进⾏重试,然⽽调用createPath我们需要知道之前的create调用中的data参数,因此我们通过create的第四个参数传⼊data,就可以将数据通过ctx对象传给回调函数。因为上下⽂对象与回调对象不同,我们可以使所有create操作使用同⼀个回调对象。
从本例中,你会注意到znode节点与⽂件(⼀个包含数据的znode节点)和⽬录(含有⼦节点的znode节点)没有什么区别,每个znode节点可以具备以上两个特点。