架构社区分布式之ZookeeperZOOKEEPER

【一】Zookeeper安装、使用-【转】

2018-03-17  本文已影响545人  cd13856c86e2

0 系列目录


Zookeeper系列


【Zookeeper系列一】Zookeeper应用介绍与安装部署

【Zookeeper系列二】ZooKeeper典型应用场景实践

【Zookeeper系列三】ZooKeeper Java API使用

【Zookeeper系列四】ZooKeeper 分布式锁实现

【Zookeeper系列五】ZooKeeper 实时更新server列表

【Zookeeper系列六】Zookeeper 工作原理


Zookeeper源码


【Zookeeper源码一】Zookeeper 源码环境搭建

【Zookeeper源码二】Zookeeper 客户端创建连接过程分析

【Zookeeper源码三】Zookeeper 单机版服务器介绍

【Zookeeper源码四】Zookeeper 集群版服务器介绍

【Zookeeper源码五】Zookeeper 集群版建立连接过程


Zookeeper应用


基于ZooKeeper的分布式Session实现


1 Zookeeper概述

ZooKeeper是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper本身可以以Standalone模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。

在网上看到了一个很不错的关于ZooKeeper的介绍: 顾名思义动物园管理员,他是拿来管大象(Hadoop) 、 蜜蜂(Hive) 、 小猪(Pig) 的管理员, Apache Hbase和 Apache Solr 以及LinkedIn sensei 等项目中都采用到了 Zookeeper。ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,ZooKeeper是以Fast Paxos算法为基础,实现同步服务,配置维护和命名服务等分布式应用。

从介绍可以看出,ZooKeeper更倾向于对大型应用的协同维护管理工作。IBM则给出了IBM对ZooKeeper的认知: Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

总之,我认为它的核心词就是一个单词,协调。


2 Zookeeper特征

在Hadoop权威指南中看到了关于ZooKeeper的一些核心特征,阅读之后感觉总结的甚是精辟,在这里引用并总结。

2.1 简易

ZooKeeper的最重要核心就是一个精简文件系统,提供一些简单的操作以及附加的抽象(例如排序和通知)。

2.2 易表达

ZooKeeper的原型是一个丰富的集合,它们是一些已建好的块,可以用来构建大型的协作数据结构和协议,例如:分布式队列、分布式锁以及一组对等体的选举。

2.3 高可用性

ZooKeeper运行在一些集群上,被设计成可用性较高的,因此应用程序可以依赖它。ZooKeeper可以帮助你的系统避免单点故障,从而建立一个可靠的应用程序。

2.4 松散耦合

ZooKeeper的交互支持参与者之间并不了解对方。例如:ZooKeeper可以被当做一种公共的机制,使得进程彼此不知道对方的存在也可以相互发现并且交互,对等方可能甚至不是同步的。

这一特点我感觉最能体现在集群的部署启动过程中。像Hadoop当把配置文件写好之后,然后运行启动脚本,则251,241,242中作为集群的虚拟机是同步启动的,也就是DataNode,NameNode,TaskTracker,以及JobTracker的启动并运行时在一次启动过程中启动的,就是运行一次启动脚本文件,则都启动起来。但是ZooKeeper的启动过程却不是这样的。我在251,241,242部署了ZooKeeper集群,并进行启动,则启动的过程是这样的:首先ssh到251然后启动,这时候251的集群节点启动起来,但是控制台一直报错,大概的含义就是没有检测到其他两个结点。接着分别ssh到241,242,分别启动集群中的剩下的结点,当241启动起来时,回到251查看,发现报错的信息减少,意思是只差一个结点。当251,241,242三台服务器的结点全部启动起来,则三台的服务器的控制台打印出正常的信息。

2.5 一个库

ZooKeeper提供了一个开源的、共享的执行存储,以及通用协作的方法,分担了每个程序员写通用协议的负担。随着时间的推移,人们可以增加和改进这个库来满足自己的需求。


3 为什么使用Zookeeper

记得在大约在2006年的时候Google出了Chubby来解决分布一致性的问题(distributed consensus problem),所有集群中的服务器通过Chubby最终选出一个Master Server ,最后这个Master Server来协调工作。简单来说其原理就是:在一个分布式系统中,有一组服务器在运行同样的程序,它们需要确定一个Value,以那个服务器提供的信息为主/为准,当这个服务器经过n/2+1的方式被选出来后,所有的机器上的Process都会被通知到这个服务器就是主服务器 Master服务器,大家以他提供的信息为准。很想知道Google Chubby中的奥妙,可惜人家Google不开源,自家用。

但是在2009年3年以后沉默已久的Yahoo在Apache上推出了类似的产品ZooKeeper,并且在Google原有Chubby的设计思想上做了一些改进,因为ZooKeeper并不是完全遵循Paxos协议,而是基于自身设计并优化的一个2 phase commit的协议,如图所示:

image

ZooKeeper跟Chubby一样用来存放一些相互协作的信息(Coordination),这些信息比较小一般不会超过1M,在zookeeper中是以一种hierarchical tree的形式来存放,这些具体的Key/Value信息就store在tree node中。

当有事件导致node数据,例如:变更,增加,删除时,Zookeeper就会调用 triggerWatch方法,判断当前的path来是否有对应的监听者(watcher),如果有watcher,会触发其process方法,执行process方法中的业务逻辑,如图:

image

3.1 应用实例

ZooKeeper有了上述的这些用途,让我们设想一下,在一个分布式系统中有这这样的一个应用:

2个任务工厂(Task Factory)一主一从,如果从的发现主的死了以后,从的就开始工作,他的工作就是向下面很多台代理(Agent)发送指令,让每台代理(Agent)获得不同的账户进行分布式并行计算,而每台代理(Agent)中将分配很多帐号,如果其中一台代理(Agent)死掉了,那么这台死掉的代理上的账户就不会继续工作了。上述,出现了3个最主要的问题:

Task Factory 主/从一致性的问题;

Task Factory 主/从心跳如何用简单+稳定 或者2者折中的方式实现;

一台代理(Agent)死掉了以后,一部分的账户就无法继续工作,需要通知所有在线的代理(Agent)重新分配一次帐号;

image

OK,让我们想想ZooKeeper是不是能帮助我们去解决目前遇到的这3个最主要的问题呢?

任务工厂Task Factory都连接到ZooKeeper上,创建节点,设置对这个节点进行监控,监控方法例如:

event= new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected,"/TaskFactory");

这个方法的意思就是只要Task Factory与zookeeper断开连接后,这个节点就会被自动删除。

原来主的任务工厂断开了TCP连接,这个被创建的/TaskFactory节点就不存在了,而且另外一个连接在上面的Task Factory可以立刻收到这个事件(Event),知道这个节点不存在了,也就是说主TaskFactory死了。

接下来另外一个活着的TaskFactory会再次创建/TaskFactory节点,并且写入自己的ip到znode里面,作为新的标记。

此时Agents也会知道主的TaskFactory不工作了,为了防止系统中大量的抛出异常,他们将会先把自己手上的事情做完,然后挂起,等待收到Zookeeper上重新创建一个/TaskFactory节点,收到 EventType.NodeCreated 类型的事件将会继续工作。

原来从的TaskFactory 将自己变成一个主TaskFactory,当系统管理员启动原来死掉的主的TaskFactory,世界又恢复平静了。

如果一台代理死掉,其他代理他们将会先把自己手上的事情做完,然后挂起,向TaskFactory发送请求,TaskFactory会重新分配(sharding)帐户到每个Agent上了,继续工作。

上述内容,大致如图所示:

image

4 Zookeeper基本知识

4.1 层次化的名字空间

ZooKeeper的整个名字空间的结构是层次化的,和一般的Linux文件系统结构非常相似,一颗很大的树。这也就是ZooKeeper的数据结构情况。名字空间的层次由斜杠/来进行分割,在名称空间里面的每一个结点的名字空间唯一由这个结点的路径来确定。

每一个节点拥有自身的一些信息,包括:数据、数据长度、创建时间、修改时间等等。从这样一类既含有数据,又作为路径表标示的节点的特点中,可以看出,ZooKeeper的节点既可以被看做是一个文件,又可以被看做是一个目录,它同时具有二者的特点。为了便于表达,今后我们将使用Znode来表示所讨论的ZooKeeper节点。

4.2 Znode

Znode维护着数据、ACL(access control list,访问控制列表)、时间戳等交换版本号等数据结构,它通过对这些数据的管理来让缓存生效并且令协调更新。每当Znode中的数据更新后它所维护的版本号将增加,这非常类似于数据库中计数器时间戳的操作方式。

另外Znode还具有原子性操作的特点:命名空间中,每一个Znode的数据将被原子地读写。读操作将读取与Znode相关的所有数据,写操作将替换掉所有的数据。除此之外,每一个节点都有一个访问控制列表,这个访问控制列表规定了用户操作的权限。

ZooKeeper中同样存在临时节点。这些节点与session同时存在,当session生命周期结束,这些临时节点也将被删除。临时节点在某些场合也发挥着非常重要的作用。

4.3 Watch机制

Watch机制就和单词本身的意思一样,看。看什么?具体来讲就是某一个或者一些Znode的变化。官方给出的定义:一个Watch事件是一个一次性的触发器,当被设置了Watch的数据发生了改变的时候,则服务器将这个改变发送给设置了Watch的客户端,以便通知它们。

Watch机制主要有以下三个特点:

1. 一次性的触发器(one-time trigger)

当数据改变的时候,那么一个Watch事件会产生并且被发送到客户端中。但是客户端只会收到一次这样的通知,如果以后这个数据再次发生改变的时候,之前设置Watch的客户端将不会再次收到改变的通知,因为Watch机制规定了它是一个一次性的触发器。

当设置监视的数据发生改变时,该监视事件会被发送到客户端,例如,如果客户端调用了 getData("/znode1", true) 并且稍后 /znode1 节点上的数据发生了改变或者被删除了,客户端将会获取到 /znode1 发生变化的监视事件,而如果 /znode1 再一次发生了变化,除非客户端再次对 /znode1 设置监视,否则客户端不会收到事件通知。

2. 发送给客户端(Sent to the client)

这个表明了Watch的通知事件是从服务器发送给客户端的,是异步的,这就表明不同的客户端收到的Watch的时间可能不同,但是ZooKeeper有保证:当一个客户端在看到Watch事件之前是不会看到结点数据的变化的。例如:A=3,此时在上面设置了一次Watch,如果A突然变成4了,那么客户端会先收到Watch事件的通知,然后才会看到A=4。

Zookeeper 客户端和服务端是通过 Socket 进行通信的,由于网络存在故障,所以监视事件很有可能不会成功地到达客户端,监视事件是异步发送至监视者的,Zookeeper 本身提供了保序性(ordering guarantee):即客户端只有首先看到了监视事件后,才会感知到它所设置监视的 znode 发生了变化(a client will never see a change for which it has set a watch until it first sees the watch event). 网络延迟或者其他因素可能导致不同的客户端在不同的时刻感知某一监视事件,但是不同的客户端所看到的一切具有一致的顺序。

3. 被设置Watch的数据(The data for which the watch was set)

这意味着 znode 节点本身具有不同的改变方式。你也可以想象 Zookeeper 维护了两条监视链表:数据监视和子节点监视(data watches and child watches) getData() and exists() 设置数据监视,getChildren() 设置子节点监视。 或者,你也可以想象 Zookeeper 设置的不同监视返回不同的数据,getData() 和 exists() 返回 znode 节点的相关信息,而 getChildren() 返回子节点列表。因此, setData() 会触发设置在某一节点上所设置的数据监视(假定数据设置成功),而一次成功的 create() 操作则会出发当前节点上所设置的数据监视以及父节点的子节点监视。一次成功的 delete() 操作将会触发当前节点的数据监视和子节点监视事件,同时也会触发该节点父节点的child watch。

Zookeeper 中的监视是轻量级的,因此容易设置、维护和分发。当客户端与 Zookeeper 服务器端失去联系时,客户端并不会收到监视事件的通知,只有当客户端重新连接后,若在必要的情况下,以前注册的监视会重新被注册并触发,对于开发人员来说 这通常是透明的。只有一种情况会导致监视事件的丢失,即:通过 exists() 设置了某个 znode 节点的监视,但是如果某个客户端在此 znode 节点被创建和删除的时间间隔内与 zookeeper 服务器失去了联系,该客户端即使稍后重新连接 zookeeper服务器后也得不到事件通知。


4.4 ACL(Access Control List)访问控制列表

这是另外一个和Linux操作系统非常相似的地方,ZooKeeper使用ACL来控制对旗下Znode结点们的访问。ACL的实现和Linux文件系统的访问权限十分类似:它通过设置权限为来表明是否允许对一个结点的相关内容的改变。

但是与传统Linux机制不太相同,一个结点的数据没有类似“拥有者,组用户,其他用户”的概念,在ZooKeeper中,ACL通过设置ID以及与其关联的权限来完成访问控制的。ACL的权限组成语法是:

(scheme:expression, perms)

前者表明设置的ID,逗号后面表示的是ID相关的权限,例如:

(ip:172.16.16.1,READ)

指明了IP地址为如上的用户的权限为只读。

以下列举以下ACL所具有的权限:

CREATE:表明你可以创建一个Znode的子结点。
READ:你可以得到这个结点的数据以及列举该结点的子结点情况。WRITE:设置一个结点的数据。
DELETE:可以删除一个结点
ADMIN:对一个结点设置权限。

5 Zookeeper部署

ZooKeeper的部署方式主要有三种,单机模式、伪集群模式、集群模式。其实剩下的两种模式都是集群模式的特殊情况。

5.1 系统环境

uname -a
cat /etc/issue
java -version
image

5.2 下载Zookeeper

下载 wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz

1. mkdir /home/taomk/zk 创建文件夹

2. cd /home/taomk/zk; 

wget [https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz](https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz) 下载

3. tar xvf zookeeper-3.4.10.tar.gz 解压缩

4. mv zookeeper-3.4.10 zookeeper346 重命名

5. cd zookeeper3.4.10; 

6. ls -l
image

5.3 配置环境变量

export ZOOKEEPER_HOME=/home/taomk/zk/zookeeper346
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf

5.4 ZooKeeper的单机模式部署

ZooKeeper的单机模式通常是用来快速测试客户端应用程序的,在实际过程中不可能是单机模式。单机模式的配置也比较简单。

1. 编写配置文件zoo.cfg:ZooKeeper的运行默认是读取zoo.cfg文件里面的内容的。

mkdir  /home/taomk/zk/zoo/zk0

cp conf/zoo_sample.cfg conf/zk0.cfg

修改配置:vim conf/zk0.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk0
clientPort=2181

在zk0.cfg这个文件中,我们需要指定 dataDir 的值,它指向了一个目录,这个目录在开始的时候需要为空。下面是每个参数的含义:

tickTime :基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

dataDir :存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。

clientPort :这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

使用单机模式时用户需要注意:这种配置方式下没有 ZooKeeper 副本,所以如果 ZooKeeper 服务器出现故障, ZooKeeper 服务将会停止。

2. 启动Zookeeper

bin/zkServer.shstartzk0.cfg
image

2.1 zk的服务显示为QuorumPeerMain

jps
5321 QuorumPeerMain
5338 Jps

2.2 查看运行状态

bin/zkServer.sh status zk0.cfg

JMX enabled by default
Using config: /home/taomk/zk/zookeeper346/bin/../conf/zk0.cfg
Mode: standalone

单节点的时,Mode会显示为standalone。

3. 停止ZooKeeper服务

bin/zkServer.sh stop zk0.cfg

JMX enabled by default
Using config: /home/taomk/zk/zookeeper346/bin/../conf/zk0.cfg
Stopping zookeeper ... STOPPED

5.5 ZooKeeper的伪集群模式部署

所谓 “伪分布式集群” 就是在,在一台PC中,启动多个ZooKeeper的实例。“完全分布式集群” 是每台PC,启动一个ZooKeeper实例。其实在企业中式不会存在的,另外为了测试一个客户端程序也没有必要存在,只有在物质条件比较匮乏的条件下才会存在的模式。集群伪分布模式就是在单机下模拟集群的ZooKeeper服务,在一台机器上面有多个ZooKeeper的JVM同时运行。

ZooKeeper的集群模式下,多个Zookeeper服务器在工作前会选举出一个Leader,在接下来的工作中这个被选举出来的Leader死了,而剩下的Zookeeper服务器会知道这个Leader死掉了,在活着的Zookeeper集群中会继续选出一个Leader,选举出Leader的目的是为了可以在分布式的环境中保证数据的一致性。

1. 确认集群服务器的数量

由于ZooKeeper集群中,会有一个Leader负责管理和协调其他集群服务器,因此服务器的数量通常都是单数,例如3,5,7...等,这样2n+1的数量的服务器就可以允许最多n台服务器的失效基数台2n+1,最多允许n台失效)。

2. 创建环境目录

 mkdir /home/taomk/zk/zoo/zk1
 mkdir /home/taomk/zk/zoo/zk2
 mkdir /home/taomk/zk/zoo/zk3

#新建myid文件
 echo "1" > /home/taomk/zk/zoo/zk1/myid
 echo "2" > /home/taomk/zk/zoo/zk2/myid
 echo "3" > /home/taomk/zk/zoo/zk3/myid

3. 分别修改配置文件

修改:dataDir,clientPort

增加:集群的实例,server.X,”X”表示每个目录中的myid的值

~ vim /home/taomk/zk/zookeeper346/conf/zk1.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/taomk/zk/zoo/zk1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

~ vim /home/taomk/zk/zookeeper346/conf/zk2.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/taomk/zk/zoo/zk2
clientPort=2182
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

~ vim /home/taomk/zk/zookeeper346/conf/zk3.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/taomk/zk/zoo/zk3
clientPort=2183
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

initLimit
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。

syncLimit
这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒。

server.A=B:C:D
其中
A 是一个数字,表示这个是第几号服务器,就是集群模式下配置的myid文件所存放的数值;
B 是这个服务器的 ip 地址;
C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

由于三个服务都在同一台电脑上,因此这里要保证地址的唯一性,因此要特别注意IP地址和端口号不要互相冲突,以免影响程序的正确执行。

3个节点的ZooKeeper集群配置完成,接下来我们的启动服务。

4. 启动集群

~ bin/zkServer.sh start zk1.cfg
~ bin/zkServer.sh start zk2.cfg
~ bin/zkServer.sh start zk3.cfg

~ jps
5422 QuorumPeerMain
5395 QuorumPeerMain
5463 QuorumPeerMain
5494 Jps

#查看节点状态
~ bin/zkServer.sh status zk1.cfg
JMX enabled by default
Using config: /home/taomk/zk/zookeeper346/bin/../conf/zk1.cfg
Mode: follower

~ bin/zkServer.sh status zk2.cfg
JMX enabled by default
Using config: /home/taomk/zk/zookeeper346/bin/../conf/zk2.cfg
Mode: leader

~ bin/zkServer.sh status zk3.cfg
JMX enabled by default
Using config: /home/taomk/zk/zookeeper346/bin/../conf/zk3.cfg
Mode: follower

5. 查看ZooKeeper物理文件目录结构

~ tree -L 3 /home/taomk/zk/zoo
image

5.6 ZooKeeper Distributed模式

ZooKeeper分布式模式安装(ZooKeeper集群)也比较容易,这里说明一下基本要点。

首先要明确的是,ZooKeeper集群是一个独立的分布式协调服务集群,“独立”的含义就是说,如果想使用ZooKeeper实现分布式应用的协调与管理,简化协调与管理,任何分布式应用都可以使用,这就要归功于Zookeeper的数据模型(Data Model)和层次命名空间(Hierarchical Namespace)结构,在设计你的分布式应用协调服务时,首要的就是考虑如何组织层次命名空间。

下面说明分布式模式的安装配置,过程如下所示:

1. 第一步:主机名称到IP地址映射配置

ZooKeeper集群中具有两个关键的角色:Leader和Follower。集群中所有的结点作为一个整体对分布式应用提供服务,集群中每个结点之间都互相连接,所以,在配置的ZooKeeper集群的时候,每一个结点的host到IP地址的映射都要配置上集群中其它结点的映射信息。

例如,ZooKeeper集群中每个结点的配置,以slave-01为例,/etc/hosts内容如下所示:

192.168.0.179   slave-01
192.168.0.178   slave-02
192.168.0.177   slave-03

ZooKeeper采用一种称为Leader election的选举算法(ZAB算法)。在整个集群运行过程中,只有一个Leader,其他的都是Follower,如果ZooKeeper集群在运行过程中Leader出了问题,系统会采用该算法重新选出一个Leader。因此,各个结点之间要能够保证互相连接,必须配置上述映射。

ZooKeeper集群启动的时候,会首先选出一个Leader,在Leader election过程中,某一个满足选举算法的结点就能成为Leader。

ZAB算法选举过程

2. 第二步:修改ZooKeeper配置文件

在其中一台机器(slave-01)上,解压缩zookeeper-3.3.4.tar.gz,修改配置文件conf/zoo.cfg,内容如下所示:

tickTime=2000  
dataDir=/home/hadoop/storage/zookeeper  
clientPort=2181  
initLimit=5  
syncLimit=2  
server.1=slave-01:2888:3888  
server.2=slave-02:2888:3888  
server.3=slave-03:2888:3888 

3. 第三步:远程复制分发安装文件

上面已经在一台机器slave-01上配置完成ZooKeeper,现在可以将该配置好的安装文件远程拷贝到集群中的各个结点对应的目录下:

通过scp -r命令拷贝

cd /home/hadoop/installation/  
scp -r zookeeper-3.3.4/ hadoop@slave-02:/home/hadoop/installation/  
scp -r zookeeper-3.3.4/ hadoop@slave-03:/home/hadoop/installation/

4. 第四步:设置myid

在我们配置的dataDir指定的目录下面,创建一个myid文件,里面内容为一个数字,用来标识当前主机,conf/zoo.cfg文件中配置的server.X中X为什么数字,则myid文件中就输入这个数字,例如:

hadoop@slave-01:~/installation/zookeeper-3.3.4$ echo "1" > /home/hadoop/storage/zookeeper/myid  
hadoop@slave-02:~/installation/zookeeper-3.3.4$ echo "2" > /home/hadoop/storage/zookeeper/myid  
hadoop@slave-03:~/installation/zookeeper-3.3.4$ echo "3" > /home/hadoop/storage/zookeeper/myid 

5. 第五步:启动ZooKeeper集群

在ZooKeeper集群的每个结点上,执行启动ZooKeeper服务的脚本,如下所示:

hadoop@slave-01:~/installation/zookeeper-3.3.4$ bin/zkServer.sh start  
hadoop@slave-02:~/installation/zookeeper-3.3.4$ bin/zkServer.sh start  
hadoop@slave-03:~/installation/zookeeper-3.3.4$ bin/zkServer.sh start

启动的顺序是slave-01>slave-02>slave-03,由于ZooKeeper集群启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以日志前面部分的异常是可以忽略的。通过后面部分可以看到,集群在选出一个Leader后,最后稳定了。其他结点可能也出现类似问题,属于正常。


6 Zookeeper命令行操作

ZooKeeper命令行工具类似于Linux的shell环境,不过功能肯定不及shell啦,但是使用它我们可以简单的对ZooKeeper进行访问,数据创建,数据修改等操作。

对于客户端来说,ZooKeeper是一个整体(ensemble),连接到ZooKeeper集群实际上感觉在独享整个集群的服务,所以,你可以在任何一个结点上建立到服务集群的连接。

当启动 ZooKeeper 服务成功之后,输入下述命令,连接到 ZooKeeper 服务:

~ bin/zkCli.sh –server 127.0.0.1:2181

连接成功后,系统会输出 ZooKeeper 的相关环境以及配置信息,并在屏幕输出“ Welcome to ZooKeeper ”等信息。

命令行工具的一些简单操作如下:

  1. 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容:
ls /
image
  1. 创建一个新的 znode ,使用 create /zk myData 。这个命令创建了一个新的 znode 节点“ zk ”以及与它关联的字符串:
create /zk "myData"
image
  1. 运行 get 命令来确认 znode 是否包含我们所创建的字符串:
get /zk
image
  1. 通过 set 命令来对 zk 所关联的字符串进行设置:
set /zk "zsl"
get /zk
image
  1. znode 删除:
delete /zk
  1. 通过help打印命令行帮助:
help
image
  1. 退出客户端连接:
quit

7 Java编程现实命令行操作

1. ZkDemo.java

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        // 创建一个与服务器的连接
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                System.out.println("EVENT:" + event.getType());
            }
        });

        // 查看根节点
        System.out.println("ls / => " + zk.getChildren("/", true));

        // 创建一个目录节点
        if (zk.exists("/node", true) == null) {
            zk.create("/node", "conan".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node conan");
            // 查看/node节点数据
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 创建一个子目录节点
        if (zk.exists("/node/sub1", true) == null) {
            zk.create("/node/sub1", "sub1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node/sub1 sub1");
            // 查看node节点
            System.out.println("ls /node => " + zk.getChildren("/node", true));
        }

        // 修改节点数据
        if (zk.exists("/node", true) != null) {
            zk.setData("/node", "changed".getBytes(), -1);
            // 查看/node节点数据
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
        }

        // 删除节点
        if (zk.exists("/node/sub1", true) != null) {
            zk.delete("/node/sub1", -1);
            zk.delete("/node", -1);
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 关闭连接
        zk.close();
    }
}

2. pom.xml,maven配置文件

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>
        <exclusions>
            <exclusion>
                <groupId>javax.jms</groupId>
                <artifactId>jms</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.sun.jdmk</groupId>
                <artifactId>jmxtools</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.sun.jmx</groupId>
                <artifactId>jmxri</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

出处:http://my.oschina.net/xianggao/blog/531204



【二】ZooKeeper典型应用场景实践-【转】


ZooKeeper是一个高可用的分布式数据管理与系统协调框架基于对Paxos算法变种(更正:ZAB{Zookeeper Atomic Broadcast}算法)的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题。网上对ZK的应用场景也有不少介绍,本文将介绍比较常用的项目例子,系统地对ZK的应用场景进行一个分门归类的介绍。

值得注意的是,ZK并非天生就是为这些应用场景设计的,都是后来众多开发者根据其框架的特性,利用其提供的一系列API接口(或者称为原语集),摸索出来的典型使用方法。因此,也非常欢迎读者分享你在ZK使用上的奇技淫巧。


0. ZAB(Zookeeper Atomic Broadcast)算法介绍


参考:http://www.cnblogs.com/study-everyday/p/7242766.html


1 Zookeeper数据模型

Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统,如图所示:

输入图片说明

图中的每个节点称为一个znode. 每个znode由3部分组成:

  1. stat. 此为状态信息, 描述该znode的版本, 权限等信息
  2. data. 与该znode关联的数据
  3. children. 该znode下的子节点

Zookeeper 这种数据结构有如下这些特点:

  1. 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1;

  2. znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录

  3. znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据

  4. znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了;

  5. znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2;

  6. znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,后面在典型的应用场景中会有实例介绍;

znode节点的状态信息:

使用get命令获取指定节点的数据时, 同时也将返回该节点的状态信息, 称为Stat. 其包含如下字段:

czxid. 节点创建时的zxid;
mzxid. 节点最新一次更新发生时的zxid;
ctime. 节点创建时的时间戳;
mtime. 节点最新一次更新发生时的时间戳;
dataVersion. 节点数据的更新次数;
cversion. 其子节点的更新次数;
aclVersion. 节点ACL(授权信息)的更新次数;
ephemeralOwner. 如果该节点为ephemeral节点, ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是ephemeral节点, ephemeralOwner值为0. 至于什么是ephemeral节点;
dataLength. 节点数据的字节数;
numChildren. 子节点个数;

zxid:

znode节点的状态信息中包含czxid和mzxid, 那么什么是zxid呢?

ZooKeeper状态的每一次改变, 都对应着一个递增的Transaction id, 该id称为zxid. 由于zxid的递增性质, 如果zxid1小于zxid2, 那么zxid1肯定先于zxid2发生. 创建任意节点, 或者更新任意节点的数据, 或者删除任意节点, 都会导致Zookeeper状态发生改变, 从而导致zxid的值增加.

session:

在client和server通信之前, 首先需要建立连接, 该连接称为session. 连接建立后, 如果发生连接超时, 授权失败, 或者显式关闭连接, 连接便处于CLOSED状态, 此时session结束.

节点类型:

讲述节点状态的ephemeralOwner字段时, 提到过有的节点是ephemeral节点, 而有的并不是. 那么节点都具有哪些类型呢? 每种类型的节点又具有哪些特点呢?

persistent. persistent节点不和特定的session绑定, 不会随着创建该节点的session的结束而消失, 而是一直存在, 除非该节点被显式删除.

ephemeral. ephemeral节点是临时性的, 如果创建该节点的session结束了, 该节点就会被自动删除. ephemeral节点不能拥有子节点. 虽然ephemeral节点与创建它的session绑定, 但只要该该节点没有被删除, 其他session就可以读写该节点中关联的数据. 使用-e参数指定创建ephemeral节点.

create -e /xing/ei world 

sequence. 严格的说, sequence并非节点类型中的一种. sequence节点既可以是ephemeral的, 也可以是persistent的. 创建sequence节点时, ZooKeeper server会在指定的节点名称后加上一个数字序列, 该数字序列是递增的. 因此可以多次创建相同的sequence节点, 而得到不同的节点. 使用-s参数指定创建sequence节点.

[zk: localhost:4180(CONNECTED) 0] create -s /xing/item world  
Created /xing/item0000000001  
[zk: localhost:4180(CONNECTED) 1] create -s /xing/item world  
Created /xing/item0000000002  
[zk: localhost:4180(CONNECTED) 2] create -s /xing/item world  
Created /xing/item0000000003  
[zk: localhost:4180(CONNECTED) 3] create -s /xing/item world  
Created /xing/item0000000004

watch:

watch的意思是监听感兴趣的事件. 在命令行中, 以下几个命令可以指定是否监听相应的事件.

ls命令. ls命令的第一个参数指定znode, 第二个参数如果为true, 则说明监听该znode的子节点的增减, 以及该znode本身的删除事件.

[zk: localhost:4180(CONNECTED) 21] ls /xing true
[]
[zk: localhost:4180(CONNECTED) 22] create /xing/item item000

WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/xing
Created /xing/item

get命令. get命令的第一个参数指定znode, 第二个参数如果为true, 则说明监听该znode的更新和删除事件.

[zk: localhost:4180(CONNECTED) 39] get /xing true
world
cZxid = 0x100000066
ctime = Fri May 17 22:30:01 CST 2013
mZxid = 0x100000066
mtime = Fri May 17 22:30:01 CST 2013
pZxid = 0x100000066
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
[zk: localhost:4180(CONNECTED) 40] create /xing/item item000
Created /xing/item
[zk: localhost:4180(CONNECTED) 41] rmr /xing

WATCHER::

WatchedEvent state:SyncConnected type:NodeDeleted path:/xing

stat命令. stat命令用于获取znode的状态信息. 第一个参数指定znode, 如果第二个参数为true, 则监听该node的更新和删除事件.

2 如何使用Zookeeper

Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化通过监控这些数据状态的变化,从而可以达到基于数据的集群管理,后面将会详细介绍 Zookeeper 能够解决的一些典型问题,这里先介绍一下,Zookeeper 的操作接口和简单使用示例。

2.1 常用接口操作

客户端要连接 Zookeeper 服务器可以通过创建 org.apache.zookeeper.ZooKeeper 的一个实例对象,然后调用这个类提供的接口来和服务器交互。

前面说了 ZooKeeper 主要是用来维护和监控一个目录节点树中存储的数据的状态,所有我们能够操作 ZooKeeper 的也和操作目录节点树大体一样,如创建一个目录节点,给某个目录节点设置数据,获取某个目录节点的所有子目录节点,给某个目录节点设置权限和监控这个目录节点的状态变化。

ZooKeeper 基本的操作示例:

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        // 创建一个与服务器的连接
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2180", 60000, new Watcher() {
            // 监控所有被触发的事件
            // 当对目录节点监控状态打开时,一旦目录节点的状态发生变化,Watcher 对象的 process 方法就会被调用。
            public void process(WatchedEvent event) {
                System.out.println("EVENT:" + event.getType());
            }
        });

        // 查看根节点
        // 获取指定 path 下的所有子目录节点,同样 getChildren方法也有一个重载方法可以设置特定的 watcher 监控子节点的状态
        System.out.println("ls / => " + zk.getChildren("/", true));

        // 判断某个 path 是否存在,并设置是否监控这个目录节点,这里的 watcher 是在创建 ZooKeeper 实例时指定的 watcher;
        // exists方法还有一个重载方法,可以指定特定的 watcher
        if (zk.exists("/node", true) == null) {
            // 创建一个给定的目录节点 path, 并给它设置数据;
            // CreateMode 标识有四种形式的目录节点,分别是:
            //     PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;
            //     PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名;
            //     EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;
            //     EPHEMERAL_SEQUENTIAL:临时自动编号节点
            zk.create("/node", "conan".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node conan");
            // 查看/node节点数据
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 创建一个子目录节点
        if (zk.exists("/node/sub1", true) == null) {
            zk.create("/node/sub1", "sub1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node/sub1 sub1");
            // 查看node节点
            System.out.println("ls /node => " + zk.getChildren("/node", true));
        }

        // 修改节点数据
        if (zk.exists("/node", true) != null) {
            // 给 path 设置数据,可以指定这个数据的版本号,如果 version 为 -1 怎可以匹配任何版本
            zk.setData("/node", "changed".getBytes(), -1);
            // 查看/node节点数据
            // 获取这个 path 对应的目录节点存储的数据,数据的版本等信息可以通过 stat 来指定,同时还可以设置是否监控这个目录节点数据的状态
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
        }

        // 删除节点
        if (zk.exists("/node/sub1", true) != null) {
            // 删除 path 对应的目录节点,version 为 -1 可以匹配任何版本,也就删除了这个目录节点所有数据
            zk.delete("/node/sub1", -1);
            zk.delete("/node", -1);
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 关闭连接
        zk.close();
    }
}


3 ZooKeeper 典型的应用场景


Zookeeper 从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应,从而实现集群中类似 Master/Slave 管理模式,关于 Zookeeper 的详细架构等内部细节可以阅读 Zookeeper 的源码。

下面详细介绍这些典型的应用场景,也就是 Zookeeper 到底能帮我们解决那些问题?下面将给出答案。

3.1 统一命名服务(Name Service)

分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住,通常情况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了 JNDI,没错 Zookeeper 的 Name Service 与 JNDI 能够完成的功能是差不多的,它们都是将有层次的目录结构关联到一定资源上,但是 Zookeeper 的 Name Service 更加是广泛意义上的关联,也许你并不需要将名称关联到特定资源上,你可能只需要一个不会重复名称,就像数据库中产生一个唯一的数字主键一样。

Name Service 已经是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就可以很容易创建一个目录节点。

命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等——这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。

命名服务实例:

阿里巴巴集团开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表,在Dubbo实现中:

服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。

服务消费者启动的时候订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。

注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息

3.2 配置管理(Configuration Management)

配置的管理在分布式应用环境中很常见,例如同一个应用系统需要多台 PC Server 运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样非常麻烦而且容易出错。

像这样的配置信息完全可以交给 Zookeeper 来管理,将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中

输入图片说明

发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。

配置管理实例:

  1. 应用中用到的一些配置信息放到ZK上进行集中管理。这类场景通常是这样:应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个Watcher,这样一来,以后每次配置有更新的时候,都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。

  2. 分布式搜索服务中,索引的元信息和服务器集群机器的节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。

  3. 分布式日志收集系统。这个系统的核心工作是收集分布在不同机器的日志。收集器通常是按照应用来分配收集任务单元,因此需要在ZK上创建一个以应用名作为path的节点P,并将这个应用的所有机器ip,以子节点的形式注册到节点P上,这样一来就能够实现机器变动的时候,能够实时通知到收集器调整任务分配。

  4. 系统中有些信息需要动态获取,并且还会存在人工手动去修改这个信息的发问。通常是暴露出接口,例如JMX接口,来获取一些运行时的信息。引入ZK之后,就不用自己实现一套方案了,只要将这些信息存放到指定的ZK节点上即可。

注意:在上面提到的应用场景中,有个默认前提是:数据量很小,但是数据更新可能会比较快的场景

3.3 集群管理(Group Membership)

Zookeeper 能够很容易的实现集群管理的功能,如有多台 Server 组成一个服务集群,那么必须要一个“总管”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让“总管”知道

Zookeeper 不仅能够帮你维护当前的集群中机器的服务状态,而且能够帮你选出一个“总管”,让这个总管来管理集群,这就是 Zookeeper 的另一个功能 Leader Election

它们的实现方式都是在 Zookeeper 上创建一个 EPHEMERAL 类型的目录节点,然后每个 Server 在它们创建目录节点的父目录节点上调用 getChildren(String path, boolean watch) 方法并设置 watch 为 true,由于是 EPHEMERAL 目录节点,当创建它的 Server 死去,这个目录节点也随之被删除,所以 Children 将会变化,这时 getChildren上的 Watch 将会被调用,所以其它 Server 就知道已经有某台 Server 死去了。新增 Server 也是同样的原理。

Zookeeper 如何实现 Leader Election,也就是选出一个 Master Server。和前面的一样每台 Server 创建一个 EPHEMERAL 目录节点,不同的是它还是一个 SEQUENTIAL 目录节点,所以它是个 EPHEMERAL_SEQUENTIAL 目录节点。之所以它是 EPHEMERAL_SEQUENTIAL 目录节点,是因为我们可以给每台 Server 编号,我们可以选择当前是最小编号的 Server 为 Master,假如这个最小编号的 Server 死去,由于是 EPHEMERAL 节点,死去的 Server 对应的节点也被删除,所以当前的节点列表中又出现一个最小编号的节点,我们就选择这个节点为当前 Master。这样就实现了动态选择 Master,避免了传统意义上单 Master 容易出现单点故障的问题。

输入图片说明

1. 集群机器监控

这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报“我还活着”。 这种做法可行,但是存在两个比较明显的问题:

  1. 集群中机器有变动的时候,牵连修改的东西比较多。
  2. 有一定的延时。

利用ZooKeeper有两个特性,就可以实现另一种集群机器存活性监控系统:

  1. 客户端在节点 x 上注册一个Watcher,那么如果 x 的子节点变化了,会通知该客户端
  2. 创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失

例如:监控系统在 /clusterServers 节点上注册一个Watcher,以后每动态加机器,那么就往 /clusterServers 下创建一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。

2. Master选举则是zookeeper中最为经典的应用场景了

在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题

利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

另外,这种场景演化一下,就是动态Master选举。这就要用到EPHEMERAL_SEQUENTIAL类型节点的特性了

上文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上消失,那么之后最小的那个机器就是Master了

3. 在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间索引数据一致。因此让集群中的Master来进行全量索引的生成,然后同步到集群中其它机器。另外,Master选举的容灾措施是,可以随时进行手动指定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master。

4. 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选举出一个HMaster来运行,从而避免了HMaster的单点问题。

输入图片说明

3.4 共享锁(Locks)

共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了

分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性。锁服务可以分为两类,一个是保持独占,另一个是控制时序

  1. 所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把zk上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁

  2. 控制时序,就是所有视图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distribute_lock 已经预先存在,客户端在它下面创建临时有序节点(这个可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENTIAL来指定)。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。

输入图片说明 输入图片说明

3.5 队列管理

Zookeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。

  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 Zookeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start

输入图片说明 输入图片说明

FIFO 队列用 Zookeeper 实现思路如下:

实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO。

输入图片说明

3.6 负载均衡

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡

消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下:

生产者负载均衡:metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。

消费负载均衡: 在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

  1. 每个分区针对同一个group只挂载一个消费者。
  2. 如果同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。
  3. 如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。

在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。

3.7 分布式通知/协调

ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理。

  1. 另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zk上某个节点关联,大大减少系统耦合。
  2. 另一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。
  3. 另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到zk来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合。

出处:https://my.oschina.net/xianggao/blog/531613




Paxos算法浅析

前言
在文章2PC/3PC到底是啥中介绍了2PC这种一致性协议,从文中了解到2PC更多的被用在了状态一致性上(分布式事务),在数据一致性中很少被使用;而Paxos正是在数据一致性中被广泛使用,在过去十年里,Paxos基本成为了分布式领域内一致性协议的代名词。Google的粗粒度锁服务Chubby的设计开发者Burrows曾经说过:“所有一致性协议本质上要么是Paxos要么是其变体”。Paxos的提出者LeslieLamport也因其对分布式系统的杰出理论贡献获得了2013年图灵奖。
在介绍Paxos之前,先介绍一下数据一致性到底被用在什么场景中,下面以副本状态机来表述

副本状态机
在分布式环境下,一致性协议的应用场景一般会采用副本状态机来表达,这是对各种不同应用场景的一种抽象化表述。
一种典型的实现副本状态机的机制是采用Log副本的方式,如下图(来源网上):

image

集群中多台服务器各自保存一份Log副本及内部状态机,Log内顺序记载客户端发来的操作指令,服务器依次执行Log内的指令并将其体现到内部状态机上,如果保证每台机器内的Log副本内容完全一致,那么对应的状态机也可以保证整体状态一致。
一致性协议的作用就是保证各个Log副本数据的一致性,上图中的一致性模块就是用来保证一致性的。
再来看一个更具体的例子:在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个「一致性算法」以保证每个节点看到的指令一致。

民主选举算法
如何保证各个Log副本数据的一致性(或者说如何来实现这个一致性模块),可能最先想到的是只需要提供一个唯一一致性模块,然后用类似2PC的方式来保证数据的一致性,但是我们也知道了2PC方式中面临着一致性模块的当机以及网络的异常等问题,最终导致数据出现不一致;
而本文要介绍的Paxos一致性协议就是如何在可能发生几起宕机或网络异常的分布式系统中,快速且正确地在集群内部对某个数据的值达成一致,并且保证不论发生以上任何异常,都不会破坏整个系统的一致性,主要原因还是Paxos提供了集群一致性模块,然后以民主选举的算法——大多数的决定会成个整个集群的统一决定。任何一个点都可以提出要修改某个数据的提案,是否通过这个提案取决于这个集群中是否有超过半数的结点同意(所以Paxos算法需要集群中的结点是单数);
当然这个保证是有一个前提的,这就是下面要介绍的拜占庭问题。

拜占庭问题
其故事背景是这样的:拜占庭位于现在土耳其的伊斯坦布尔,是东罗马帝国的首都。由于当时拜占庭罗马帝国国土辽阔,为了防御目的,因此每个军队都分隔很远,将军与将军之间只能靠信差传消息。 在战争的时候,拜占庭军队内所有将军必需达成一致的共识,决定是否有赢的机会才去攻打敌人的阵营。但是,军队可能有叛徒和敌军间谍,这些叛徒将军们会扰乱或左右决策的过程。这时候,在已知有成员谋反的情况下,其余忠诚的将军在不受叛徒的影响下如何达成一致的协议,这就是拜占庭将军问题。
从理论上来说,在分布式计算领域,试图在异步系统和不可靠的通道上来达到一致性是不可能的,因此在对一致性的研究过程中,都往往假设信道是可靠的,即假设不存在拜占庭问题。
非拜占庭模型定义:
1.一致性模块的行为可以以任意速度执行,允许运行失败,在失败后也许会重启并再次运行;
2.一致性模块之间通过异步方式发送信息通信,通信时间可以任意长,信息可能会在传输过程中丢失,也允许重复发送相同的信息,多重信息的顺序可以任意。但是有一点:信息不允许被篡改。

Paxos的基本概念
首先是并行进程(对应副本状态机上每台服务器的一致性模块)的角色概念,Paxos协议下不同并行进程可能承担的三种角色如下:
倡议者(Proposer):倡议者可以提出提议(数值或操作命令等)以供投票表决;
接受者(Acceptor):接受者可以对倡议者提出的提议进行投票表决,从众多提议中选出唯一确定的一个;
学习者(Learner):学习者无倡议投票权,但是可以从接受者那里获知是哪个提议最终被选中;
在一致性协议框架中,一个并行进程可以同时承担以上多种角色。
划分角色后,就可以更精确的定义问题:
1.决议(value)只有在被 proposers 提出后才能批准(未经批准的决议称为「提案(proposal)」);
2.在一次Paxos算法的执行实例中,只批准一个Value;
3.Learner只能获得被批准(chosen)的Value。

Paxos一致性协议
Paxos的目的是在非拜占庭条件下,当多个并行进程提出不同的倡议时,如何能够达成一致。如果归纳Paxos协议,可以将其描述为以下两阶段过程:
阶段一:Prepare阶段
1.1【倡议者视角】倡议者选择倡议编号n,然后向大多数(即超过半数以上)接受者发送Prepare请求,请求中附带倡议编号n。
1.2【接受者视角】对于某个接受者来说,如果接收到带有倡议编号n的Prepare请求,则做如下判断:若倡议编号n比此接受者之前响应过的任何其它Prepare请求附带的倡议编号都大,那么此接受者会给倡议者以响应,并承诺不会响应之后接收到的其它任何倡议编号小于n的请求,另外,如果接受者曾经响应过2.2阶段的Accept请求,则将所有响应的Accept请求中倡议编号最高的倡议内容发送给倡议者,倡议内容包括两项信息:Accept请求中的倡议编号以及其倡议值。若倡议编号n不比此接受者之前响应过的任何其它Prepare请求附带的倡议编号都大,那么此接受者不会给倡议者以响应。

阶段二:Accept阶段
2.1【倡议者视角】如果倡议者接收到大多数接受者关于带有倡议编号n的Prepare请求的响应,那么倡议者向这些接受者发送Accept请求,Accept请求附带两个信息:倡议编号n以及倡议值v。倡议值v的选择方式如下:如果在1.2阶段接受者返回了自己曾经接受的具有最高倡议编号Accept请求倡议内容,则从这些倡议内容里面选择倡议编号最高的并将其倡议值作为倡议值v;如果1.2阶段没有收到任何接受者的Accept请求倡议内容,则可以任意赋值给倡议值v。

2.2【接受者视角】如果接受者接收到了任意倡议编号为n的Accept请求,则接受者接受此请求,除非在此期间接受者响应过具有比n更高编号的Prepare请求。通过以上两阶段过程即可选出唯一的倡议值,对于学习者来说,其需要从接受者那里获知到底是哪个倡议值被选出。一个直观的方法如下:每当接受者执行完2.2步骤,即接受某个Accept请求后,由其通知所有学习者其所接受的倡议,这样,学习者很快习得是哪个倡议被最终选出。但是这种方式会导致大量通信,因为任意一个接受者会通知任意一个学习者,如果有m个接受者,n个学习者,则需要m*n次通信。一个替代策略是:从众多学习者中选择一个作为代表,由其从接受者那里获知最终被选出的倡议,然后再由其通知其它学习者,这样可以将通信量降为m+n。但是这个方案中如果这个学习者代表发生故障,其它学习者无从知晓倡议值。考虑到健壮性和通信量两个因素,可以采取折中方法:选出若干学习者作为代表,由这些代表从接受者那里获知最终倡议值,然后通知其它学习者。

通过以上流程,如果有多个并发进程提出各自的倡议值,Paxos就可以保证从中选出且只选出一个唯一确定的倡议值,以此来达到副本状态机保持状态一致的目标。

总结

此文只是对Paxos的应用场景以及Paxos协议本身进行了介绍,而Paxos最难理解性在于是什么因素导致协议以此种方式呈现以及其正确性证明过程而非最终协议本身内容。

出处:https://my.oschina.net/OutOfMemory/blog/807634



ZAB协议和Paxos算法


前言
在上一篇文章Paxos算法浅析中主要介绍了Paxos一致性算法应用的场景,以及对协议本身的介绍;Google Chubby是一个分布式锁服务,其底层一致性实现就是以Paxos算法为基础的;但这篇文件并不是介绍Chubby,而是介绍了一个和Chubby拥有类似功能的开放源码的分布式协调服务Zookeeper,以及Zookeeper数据一致性的核心算法ZAB。

Zookeeper简介
Zookeeper是一个分布式数据一致性的解决方案,分布式应用可以基于它实现诸如数据发布/订阅,负载均衡,命名服务,分布式协调/通知,集群管理,Master选举,分布式锁和分布式队列等功能。Zookeeper致力于提供一个高性能、高可用、且具有严格的顺序访问控制能力的分布式协调系统。
考虑到Zookeeper主要操作数据的状态,为了保证状态的一致性,Zookeeper提出了两个安全属性:
1.全序(Total order):如果消息a在消息b之前发送,则所有Server应该看到相同的结果
2.因果顺序(Causal order):如果消息a在消息b之前发生(a导致了b),并被一起发送,则a始终在b之前被执行。
为了保证上述两个安全属性,Zookeeper使用了TCP协议和Leader
通过使用TCP协议保证了消息的全序特性(先发先到),
通过Leader解决了因果顺序问题:先到Leader的先执行,但是这样的话Leader有可能出现出现网络中断、崩溃退出与重启等异常情况,这就有必要引入Leader选举算法。
而ZAB(Zookeeper Atomic Broadcast即Zookeeper原子消息广播协议)正是作为其数据一致性的核心算法,下面介绍一下ZAB协议。

ZAB协议
ZAB协议包括两种基本的模式:崩溃恢复和消息广播
当整个服务框架在启动过程中,或是当Leader服务器出现网络中断崩溃退出与重启等异常情况时,ZAB就会进入恢复模式并选举产生新的Leader服务器。
当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出崩溃恢复模式,进入消息广播模式。
当有新的服务器加入到集群中去,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加入的服务器会自动进入数据恢复模式,找到Leader服务器,并与其进行数据同步,然后一起参与到消息广播流程中去。
以上其实大致经历了三个步骤:
1.崩溃恢复:主要就是Leader选举过程
2.数据同步:Leader服务器与其他服务器进行数据同步
3.消息广播:Leader服务器将数据发送给其他服务器

下面具体看看这三个步骤
1.消息广播
ZAB协议的消息广播过程使用的是一个原子广播协议,类似二阶段提交(2PC/3PC到底是啥),具体可以看来源网上的一张图片:

image

客户端的请求,Leader服务器为其生成对于的Propose,并将其发送给其他服务器,然后再分别收集选票,最后进行提交;在广播Propose之前,Leader会为这个Propose分配一个全局单调递增的唯一ID,称之为事务ID(ZXID);由于ZAB协议需要保证每一个消息严格的因果关系,因此必须将每一个Propose按照其ZXID的先后顺序来进行排序与处理。
具体做法就是Leader为每一个Follower都各自分配一个单独的队列,然后将需要广播的Propose依次放入队列中。

2.崩溃恢复
消息广播中如果Leader出现网络中断、崩溃退出与重启等异常,将进入崩溃恢复,恢复的过程中有2个问题需要解决:
1.ZAB协议需要确保那些已经在Leader服务器上提交的事务,最终被所有服务器都提交
2.ZAB协议需要确保丢弃那些只在Leader服务器上被提交的事务
针对以上两个问题,如果让Leader选举算法能够保证新选出来的Leader服务器拥有集群中所有机器最高编号(ZXID)的Propose,那么就可以保证这个新选出来的Leader一定具有所有已经提交的提案;如果让具有最高编号的机器成为Leader,就可以省去Leader服务器检查Propose的提交和抛弃了。

3.数据同步
Leader服务器会为每个Follower服务器都准备一个队列,并将那些没有被各Follower同步的事务以propose消息的形式逐个发送给Follower服务器,并在每个消息的后面发送一个commit消息,表示提交事务;等到同步完成之后,leader服务器会将该服务器加入到真正的可用Follower列表中。
崩溃恢复中提到2个问题,看看如何解决ZAB协议需要确保丢弃那些只在Leader服务器上被提交的事务:
事务编号ZXID被设计为一个64位的数字,低32位是一个简单的递增计数器,高32位是Leader周期的epoch编码,每当选举产生一个新的Leader服务器,就会从这个Leader服务器上取出本地日志中最大事务propose的ZXID,然后解析出epoch,最后对epoch加1;低32位就从0开始重新生成新的ZXID。ZAB协议通过epoch编号来区分Leader周期变化的策略,来保证丢弃那些只在上一个Leader服务器上被提交的事务。

Zab与Paxos
Zab的作者认为Zab与paxos并不相同,只所以没有采用Paxos是因为Paxos保证不了全序顺序:
Because multiple leaders can propose a value for a given instance two problems arise.
First, proposals can conflict. Paxos uses ballots to detect and resolve conflicting proposals.
Second, it is not enough to know that a given instance number has been committed, processes must also be able to fi gure out which value has been committed.
Paxos算法的确是不关心请求之间的逻辑顺序,而只考虑数据之间的全序,但很少有人直接使用paxos算法,都会经过一定的简化、优化。

Paxos算法优化
Paxos算法在出现竞争的情况下,其收敛速度很慢,甚至可能出现活锁的情况,例如当有三个及三个以上的proposer在发送prepare请求后,很难有一个proposer收到半数以上的回复而不断地执行第一阶段的协议。因此,为了避免竞争,加快收敛的速度,在算法中引入了一个Leader这个角色,在正常情况下同时应该最多只能有一个参与者扮演Leader角色,而其它的参与者则扮演Acceptor的角色。
在这种优化算法中,只有Leader可以提出议案,从而避免了竞争使得算法能够快速地收敛而趋于一致;而为了保证Leader的健壮性,又引入了Leader选举,再考虑到同步的阶段,渐渐的你会发现对Paxos算法的简化和优化已经和上面介绍的ZAB协议很相似了。

总结
Google的粗粒度锁服务Chubby的设计开发者Burrows曾经说过:“所有一致性协议本质上要么是Paxos要么是其变体”。这句话还是有一定道理的,ZAB本质上就是Paxos的一种简化形式。

出处:http://www.cnblogs.com/study-everyday/p/7242766.html



Zookeeper的一致性协议:Zab

Zookeeper使用了一种称为Zab(Zookeeper Atomic Broadcast)的协议作为其一致性复制的核心,据其作者说这是一种新发算法,其特点是充分考虑了Yahoo的具体情况:高吞吐量、低延迟、健壮、简单,但不过分要求其扩展性。下面将展示一些该协议的核心内容:

另,本文仅讨论Zookeeper使用的一致性协议而非讨论其源码实现

Zookeeper的实现是有Client、Server构成,Server端提供了一个一致性复制、存储服务,Client端会提供一些具体的语义,比如分布式锁、选举算法、分布式互斥等。从存储内容来说,Server端更多的是存储一些数据的状态,而非数据内容本身,因此Zookeeper可以作为一个小文件系统使用。数据状态的存储量相对不大,完全可以全部加载到内存中,从而极大地消除了通信延迟。

Server可以Crash后重启,考虑到容错性,Server必须“记住”之前的数据状态,因此数据需要持久化,但吞吐量很高时,磁盘的IO便成为系统瓶颈,其解决办法是使用缓存,把随机写变为连续写。

考虑到Zookeeper主要操作数据的状态,为了保证状态的一致性,Zookeeper提出了两个安全属性(Safety Property)

为了保证上述两个安全属性,Zookeeper使用了TCP协议和Leader。通过使用TCP协议保证了消息的全序特性(先发先到),通过Leader解决了因果顺序问题:先到Leader的先执行。因为有了Leader,Zookeeper的架构就变为:Master-Slave模式,但在该模式中Master(Leader)会Crash,因此,Zookeeper引入了Leader选举算法,以保证系统的健壮性。归纳起来Zookeeper整个工作分两个阶段:

1. Atomic Broadcast

同一时刻存在一个Leader节点,其他节点称为“Follower”,如果是更新请求,如果客户端连接到Leader节点,则由Leader节点执行其请求;如果连接到Follower节点,则需转发请求到Leader节点执行。但对读请求,Client可以直接从Follower上读取数据,如果需要读到最新数据,则需要从Leader节点进行,Zookeeper设计的读写比例是2:1。

Leader通过一个简化版的二段提交模式向其他Follower发送请求,但与二段提交有两个明显的不同之处:

通俗地说,如果有2f+1个节点,允许f个节点失败。因为任何两个多数派必有一个交集,当Leader切换时,通过这些交集节点可以获得当前系统的最新状态。如果没有一个多数派存在(存活节点数小于f+1)则,算法过程结束。但有一个特例:

如果有A、B、C三个节点,A是Leader,如果B Crash,则A、C能正常工作,因为A是Leader,A、C还构成多数派;如果A Crash则无法继续工作,因为Leader选举的多数派无法构成。

2. Leader Election

Leader选举主要是依赖Paxos算法,具体算法过程请参考其他博文,这里仅考虑Leader选举带来的一些问题。Leader选举遇到的最大问题是,”新老交互“的问题,新Leader是否要继续老Leader的状态。这里要按老Leader Crash的时机点分几种情况:

  1. 老Leader在COMMIT前Crash(已经提交到本地)
  2. 老Leader在COMMIT后Crash,但有部分Follower接收到了Commit请求

第一种情况,这些数据只有老Leader自己知道,当老Leader重启后,需要与新Leader同步并把这些数据从本地删除,以维持状态一致。

第二种情况,新Leader应该能通过一个多数派获得老Leader提交的最新数据

老Leader重启后,可能还会认为自己是Leader,可能会继续发送未完成的请求,从而因为两个Leader同时存在导致算法过程失败,解决办法是把Leader信息加入每条消息的id中,Zookeeper中称为zxid,zxid为一64位数字,高32位为leader信息又称为epoch,每次leader转换时递增;低32位为消息编号,Leader转换时应该从0重新开始编号。通过zxid,Follower能很容易发现请求是否来自老Leader,从而拒绝老Leader的请求。

因为在老Leader中存在着数据删除(情况1),因此Zookeeper的数据存储要支持补偿操作,这也就需要像数据库一样记录log。

3. Zab与Paxos

Zab的作者认为Zab与paxos并不相同,只所以没有采用Paxos是因为Paxos保证不了全序顺序:

<pre style="margin: 0px 0px 24px; padding: 0px; font-weight: 400; box-sizing: border-box; background-color: rgb(240, 240, 240); overflow-x: auto; font-family: Consolas, Inconsolata, Courier, monospace; font-size: 14px; line-height: 22px; color: rgb(0, 0, 0); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">Because multiple leaders can
propose a value for a given instance two problems arise.
First, proposals can conflict. Paxos uses ballots to detect and resolve conflicting proposals.
Second, it is not enough to know that a given instance number has been committed, processes must also be able to figure out which value has been committed.</pre>

Paxos算法的确是不关系请求之间的逻辑顺序,而只考虑数据之间的全序,但很少有人直接使用paxos算法,都会经过一定的简化、优化。

一般Paxos都会有几种简化形式,其中之一便是,在存在Leader的情况下,可以简化为1个阶段(Phase2)。仅有一个阶段的场景需要有一个健壮的Leader,因此工作重点就变为Leader选举,在考虑到Learner的过程,还需要一个”学习“的阶段,通过这种方式,Paxos可简化为两个阶段:

如果再考虑多数派要Learn成功,这其实就是Zab协议。Paxos算法着重是强调了选举过程的控制,对决议学习考虑的不多,Zab恰好对此进行了补充。

之前有人说,所有分布式算法都是Paxos的简化形式,虽然很绝对,但对很多情况的确如此,但不知Zab的作者是否认同这种说法?

4.结束

本文只是想从协议、算法的角度分析Zookeeper,而非分析其源码实现,因为Zookeeper版本的变化,文中描述的场景或许已找不到对应的实现。另,本文还试图揭露一个事实:Zab就是Paxos的一种简化形式。

【参考资料】

出处:http://blog.csdn.net/chen77716/article/details/7309915


【Zookeeper系列三】ZooKeeper Java API使用


ZooKeeper提供了Java和C的binding. 本文关注Java相关的API.

1 准备工作

拷贝ZooKeeper安装目录下的zookeeper.x.x.x.jar文件到项目的classpath路径下.

2 创建连接和回调接口

首先需要创建ZooKeeper对象, 后续的一切操作都是基于该对象进行的.

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException

以下为各个参数的详细说明:

connectString:zookeeper server列表, 以逗号隔开. ZooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其建立连接. 如果连接建立失败, 则会从列表的剩余项中选择一个server, 并再次尝试建立连接.

sessionTimeout:指定连接的超时时间.

watcher:事件回调接口.

注意, 创建ZooKeeper对象时, 只要对象完成初始化便立刻返回. 建立连接是以异步的形式进行的, 当连接成功建立后, 会回调watcher的process方法. 如果想要同步建立与server的连接, 需要自己进一步封装.

public class ZKConnection {
    /**
     * server列表, 以逗号分割
     */
    protected String hosts = "localhost:4180,localhost:4181,localhost:4182";
    /**
     * 连接的超时时间, 毫秒
     */
    private static final int SESSION_TIMEOUT = 5000;
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    protected ZooKeeper zk;

    /**
     * 连接zookeeper server
     */
    public void connect() throws Exception {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());
        // 等待连接完成
        connectedSignal.await();
    }

    public class ConnWatcher implements Watcher {
        public void process(WatchedEvent event) {
            // 连接建立, 回调process接口时, 其event.getState()为KeeperState.SyncConnected
            if (event.getState() == KeeperState.SyncConnected) {
                // 放开闸门, wait在connect方法上的线程将被唤醒
                connectedSignal.countDown();
            }
        }
    }
}

3 创建znode

ZooKeeper对象的create方法用于创建znode.

String create(String path, byte[] data, List acl, CreateMode createMode);

以下为各个参数的详细说明:

path:znode的路径.
data:与znode关联的数据.
acl:指定权限信息, 如果不想指定权限, 可以传入Ids.OPEN_ACL_UNSAFE.
createMode:指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入即可;

/**
 * 创建临时节点
 */
public void create(String nodePath, byte[] data) throws Exception {
    zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}

4 获取子node列表

ZooKeeper对象的getChildren方法用于获取子node列表.

List getChildren(String path, boolean watch);

watch参数用于指定是否监听path node的子node的增加和删除事件, 以及path node本身的删除事件.

5 判断znode是否存在

ZooKeeper对象的exists方法用于判断指定znode是否存在.

Stat exists(String path, boolean watch);

watch参数用于指定是否监听path node的创建, 删除事件, 以及数据更新事件. 如果该node存在, 则返回该node的状态信息, 否则返回null.

6 获取node中关联的数据

ZooKeeper对象的getData方法用于获取node关联的数据.

byte[] getData(String path, boolean watch, Stat stat);

watch 参数用于指定是否监听path node的删除事件, 以及数据更新事件, 注意, 不监听path node的创建事件, 因为如果path node不存在, 该方法将抛出KeeperException.NoNodeException异常.
stat 参数是个传出参数, getData方法会将path node的状态信息设置到该参数中.

7 更新node中关联的数据

ZooKeeper对象的setData方法用于更新node关联的数据.

Stat setData(final String path, byte data[], int version);
data为待更新的数据.
version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查.
返回path node的状态信息.

8 删除znode

ZooKeeper对象的delete方法用于删除znode.

void delete(final String path, int version);

9 其余接口

请查看ZooKeeper对象的API文档.

10 代码实例

public class JavaApiSample implements Watcher {

    private static final int SESSION_TIMEOUT = 10000;
    //    private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181";
    private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    private static final String ZK_PATH = "/nileader";
    private ZooKeeper zk = null;

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 创建ZK连接
     *
     * @param connectString  ZK服务器地址列表
     * @param sessionTimeout Session超时时间
     */
    public void createConnection(String connectString, int sessionTimeout) {
        this.releaseConnection();
        try {
            zk = new ZooKeeper(connectString, sessionTimeout, this);
            connectedSemaphore.await();
        } catch (InterruptedException e) {
            System.out.println("连接创建失败,发生 InterruptedException");
            e.printStackTrace();
        } catch (IOException e) {
            System.out.println("连接创建失败,发生 IOException");
            e.printStackTrace();
        }
    }

    /**
     * 关闭ZK连接
     */
    public void releaseConnection() {
        if (null != this.zk) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                // ignore
                e.printStackTrace();
            }
        }
    }

    /**
     * 创建节点
     *
     * @param path 节点path
     * @param data 初始数据内容
     * @return
     */
    public boolean createPath(String path, String data) {
        try {
            System.out.println("节点创建成功, Path: "
                    + this.zk.create(path, //
                    data.getBytes(), //
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, //
                    CreateMode.EPHEMERAL)
                    + ", content: " + data);
        } catch (KeeperException e) {
            System.out.println("节点创建失败,发生KeeperException");
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("节点创建失败,发生 InterruptedException");
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 读取指定节点数据内容
     *
     * @param path 节点path
     * @return
     */
    public String readData(String path) {
        try {
            System.out.println("获取数据成功,path:" + path);
            return new String(this.zk.getData(path, false, null));
        } catch (KeeperException e) {
            System.out.println("读取数据失败,发生KeeperException,path: " + path);
            e.printStackTrace();
            return "";
        } catch (InterruptedException e) {
            System.out.println("读取数据失败,发生 InterruptedException,path: " + path);
            e.printStackTrace();
            return "";
        }
    }

    /**
     * 更新指定节点数据内容
     *
     * @param path 节点path
     * @param data 数据内容
     * @return
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println("更新数据成功,path:" + path + ", stat: " +
                    this.zk.setData(path, data.getBytes(), -1));
        } catch (KeeperException e) {
            System.out.println("更新数据失败,发生KeeperException,path: " + path);
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("更新数据失败,发生 InterruptedException,path: " + path);
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 删除指定节点
     *
     * @param path 节点path
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println("删除节点成功,path:" + path);
        } catch (KeeperException e) {
            System.out.println("删除节点失败,发生KeeperException,path: " + path);
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("删除节点失败,发生 InterruptedException,path: " + path);
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        JavaApiSample sample = new JavaApiSample();
        sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
        if (sample.createPath(ZK_PATH, "我是节点初始内容")) {
            System.out.println();
            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");
            sample.writeData(ZK_PATH, "更新后的数据");
            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");
            sample.deleteNode(ZK_PATH);
        }

        sample.releaseConnection();
    }

    /**
     * 收到来自Server的Watcher通知后的处理。
     */
    @Override
    public void process(WatchedEvent event) {
        System.out.println("收到事件通知:" + event.getState() + "\n");
        if (Event.KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }
}

11 需要注意的几个地方

出处:https://my.oschina.net/xianggao/blog/531936



【Zookeeper系列四】ZooKeeper 分布式锁实现


1 场景描述

在分布式应用, 往往存在多个进程提供同一服务. 这些进程有可能在相同的机器上, 也有可能分布在不同的机器上. 如果这些进程共享了一些资源, 可能就需要分布式锁来锁定对这些资源的访问

2 思路

进程需要访问共享数据时, 就在"/locks"节点下创建一个sequence类型的子节点, 称为thisPath. 当thisPath在所有子节点中最小时, 说明该进程获得了锁. 进程获得锁之后, 就可以访问共享资源了. 访问完成后, 需要将thisPath删除. 锁由新的最小的子节点获得.

有了清晰的思路之后, 还需要补充一些细节. 进程如何知道thisPath是所有子节点中最小的呢? 可以在创建的时候, 通过getChildren方法获取子节点列表, 然后在列表中找到排名比thisPath前1位的节点, 称为waitPath, 然后在waitPath上注册监听, 当waitPath被删除后, 进程获得通知, 此时说明该进程获得了锁.

3 算法

  1. lock操作过程:

首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容;

每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点);

进行lock操作时,获取对应lock根节点下的所有子节点,也即处于竞争中的资源标识;

按照Fair(公平)竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。

如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程

  1. unlock操作过程:

将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出

  1. 其中的几个关键点:

node节点选择为EPHEMERAL_SEQUENTIAL很重要。

自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。

选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理

获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用互斥信号共享锁BooleanMutex进行通知,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)

  1. 注意:

使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。

同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象

4 实现

1. DistributedLock.java源码:分布式锁

package com.king.lock;

import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

/**
 * Zookeeper 分布式锁
 */
public class DistributedLock {

    private static final int SESSION_TIMEOUT = 10000;

    private static final int DEFAULT_TIMEOUT_PERIOD = 10000;

    private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    private static final byte[] data = {0x12, 0x34};

    private ZooKeeper zookeeper;

    private String root;

    private String id;

    private LockNode idName;

    private String ownerId;

    private String lastChildId;

    private Throwable other = null;

    private KeeperException exception = null;

    private InterruptedException interrupt = null;

    public DistributedLock(String root) {
        try {
            this.zookeeper = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, null);
            this.root = root;
            ensureExists(root);
        } catch (IOException e) {
            e.printStackTrace();
            other = e;
        }
    }

    /**
     * 尝试获取锁操作,阻塞式可被中断
     */
    public void lock() throws Exception {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new Exception("", other);
        }

        if (isOwner()) {// 锁重入
            return;
        }

        BooleanMutex mutex = new BooleanMutex();
        acquireLock(mutex);
        // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
        try {
//          mutex.lockTimeOut(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
            mutex.lock();
        } catch (Exception e) {
            e.printStackTrace();
            if (!mutex.state()) {
                lock();
            }
        }

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new Exception("", other);
        }
    }

    /**
     * 尝试获取锁对象, 不会阻塞
     *
     * @throws InterruptedException
     * @throws KeeperException
     */
    public boolean tryLock() throws Exception {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (isOwner()) { // 锁重入
            return true;
        }

        acquireLock(null);

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            Thread.currentThread().interrupt();
        }

        if (other != null) {
            throw new Exception("", other);
        }

        return isOwner();
    }

    /**
     * 释放锁对象
     */
    public void unlock() throws KeeperException {
        if (id != null) {
            try {
                zookeeper.delete(root + "/" + id, -1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } finally {
                id = null;
            }
        } else {
            //do nothing
        }
    }

    /**
     * 判断某path节点是否存在,不存在就创建
     * @param path
     */
    private void ensureExists(final String path) {
        try {
            Stat stat = zookeeper.exists(path, false);
            if (stat != null) {
                return;
            }
            zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            exception = e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            interrupt = e;
        }
    }

    /**
     * 返回锁对象对应的path
     */
    public String getRoot() {
        return root;
    }

    /**
     * 判断当前是不是锁的owner
     */
    public boolean isOwner() {
        return id != null && ownerId != null && id.equals(ownerId);
    }

    /**
     * 返回当前的节点id
     */
    public String getId() {
        return this.id;
    }

    // ===================== helper method =============================

    /**
     * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
     */
    private Boolean acquireLock(final BooleanMutex mutex) {
        try {
            do {
                if (id == null) { // 构建当前lock的唯一标识
                    long sessionId = zookeeper.getSessionId();
                    String prefix = "x-" + sessionId + "-";
                    // 如果第一次,则创建一个节点
                    String path = zookeeper.create(root + "/" + prefix, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    int index = path.lastIndexOf("/");
                    id = StringUtils.substring(path, index + 1);
                    idName = new LockNode(id);
                }

                if (id != null) {
                    List<String> names = zookeeper.getChildren(root, false);
                    if (names.isEmpty()) {
                        id = null; // 异常情况,重新创建一个
                    } else {
                        // 对节点进行排序
                        SortedSet<LockNode> sortedNames = new TreeSet<>();
                        for (String name : names) {
                            sortedNames.add(new LockNode(name));
                        }

                        if (!sortedNames.contains(idName)) {
                            id = null;// 清空为null,重新创建一个
                            continue;
                        }

                        // 将第一个节点做为ownerId
                        ownerId = sortedNames.first().getName();
                        if (mutex != null && isOwner()) {
                            mutex.unlock();// 直接更新状态,返回
                            return true;
                        } else if (mutex == null) {
                            return isOwner();
                        }

                        SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
                        if (!lessThanMe.isEmpty()) {
                            // 关注一下排队在自己之前的最近的一个节点
                            LockNode lastChildName = lessThanMe.last();
                            lastChildId = lastChildName.getName();
                            // 异步watcher处理
                            Stat stat = zookeeper.exists(root + "/" + lastChildId, new Watcher() {
                                public void process(WatchedEvent event) {
                                    acquireLock(mutex);
                                }
                            });

                            if (stat == null) {
                                acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
                            }
                        } else {
                            if (isOwner()) {
                                mutex.unlock();
                            } else {
                                id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
                            }
                        }
                    }
                }
            } while (id == null);
        } catch (KeeperException e) {
            exception = e;
            if (mutex != null) {
                mutex.unlock();
            }
        } catch (InterruptedException e) {
            interrupt = e;
            if (mutex != null) {
                mutex.unlock();
            }
        } catch (Throwable e) {
            other = e;
            if (mutex != null) {
                mutex.unlock();
            }
        }

        if (isOwner() && mutex != null) {
            mutex.unlock();
        }
        return Boolean.FALSE;
    }
}

2. BooleanMutex.java源码:互斥信号共享锁

package com.king.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 互斥信号共享锁
 */
public class BooleanMutex {

    private Sync sync;

    public BooleanMutex() {
        sync = new Sync();
        set(false);
    }

    /**
     * 阻塞等待Boolean为true
     * @throws InterruptedException
     */
    public void lock() throws InterruptedException {
        sync.innerLock();
    }

    /**
     * 阻塞等待Boolean为true,允许设置超时时间
     * @param timeout
     * @param unit
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        sync.innerLock(unit.toNanos(timeout));
    }

    public void unlock(){
        set(true);
    }

    /**
     * 重新设置对应的Boolean mutex
     * @param mutex
     */
    public void set(Boolean mutex) {
        if (mutex) {
            sync.innerSetTrue();
        } else {
            sync.innerSetFalse();
        }
    }

    public boolean state() {
        return sync.innerState();
    }

    /**
     * 互斥信号共享锁
     */
    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;

        /**
         * 状态为1,则唤醒被阻塞在状态为FALSE的所有线程
         */
        private static final int TRUE = 1;
        /**
         * 状态为0,则当前线程阻塞,等待被唤醒
         */
        private static final int FALSE = 0;

        /**
         * 返回值大于0,则执行;返回值小于0,则阻塞
         */
        protected int tryAcquireShared(int arg) {
            return getState() == 1 ? 1 : -1;
        }

        /**
         * 实现AQS的接口,释放共享锁的判断
         */
        protected boolean tryReleaseShared(int ignore) {
            // 始终返回true,代表可以release
            return true;
        }

        private boolean innerState() {
            return getState() == 1;
        }

        private void innerLock() throws InterruptedException {
            acquireSharedInterruptibly(0);
        }

        private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
        }

        private void innerSetTrue() {
            for (;;) {
                int s = getState();
                if (s == TRUE) {
                    return; // 直接退出
                }
                if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作
                    releaseShared(0);// 释放一下锁对象,唤醒一下阻塞的Thread
                }
            }
        }

        private void innerSetFalse() {
            for (;;) {
                int s = getState();
                if (s == FALSE) {
                    return; //直接退出
                }
                if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操作
                    setState(FALSE);
                }
            }
        }
    }
}

3. 相关说明:

image.png

4. 测试类:

package com.king.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.zookeeper.KeeperException;

/**
 * 分布式锁测试
 * @author taomk
 * @version 1.0
 * @since 15-11-19 上午11:48
 */
public class DistributedLockTest {

    public static void main(String [] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            final DistributedLock node = new DistributedLock("/locks");
            executor.submit(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(1000);
//                      node.tryLock(); // 无阻塞获取锁
                        node.lock(); // 阻塞获取锁
                        Thread.sleep(100);

                        System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                        try {
                            node.unlock();
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        }
                    }

                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

控制台输出:

id: x-239027745716109354-0000000248 is leader: true
id: x-22854963329433645-0000000249 is leader: true
id: x-22854963329433646-0000000250 is leader: true
id: x-166970151413415997-0000000251 is leader: true
id: x-166970151413415998-0000000252 is leader: true
id: x-166970151413415999-0000000253 is leader: true
id: x-166970151413416000-0000000254 is leader: true
id: x-166970151413416001-0000000255 is leader: true
id: x-166970151413416002-0000000256 is leader: true
id: x-22854963329433647-0000000257 is leader: true
id: x-239027745716109355-0000000258 is leader: true
id: x-166970151413416003-0000000259 is leader: true
id: x-94912557367427124-0000000260 is leader: true
id: x-22854963329433648-0000000261 is leader: true
id: x-239027745716109356-0000000262 is leader: true
id: x-239027745716109357-0000000263 is leader: true
id: x-166970151413416004-0000000264 is leader: true
id: x-239027745716109358-0000000265 is leader: true
id: x-239027745716109359-0000000266 is leader: true
id: x-22854963329433649-0000000267 is leader: true
id: x-22854963329433650-0000000268 is leader: true
id: x-94912557367427125-0000000269 is leader: true
id: x-22854963329433651-0000000270 is leader: true
id: x-94912557367427126-0000000271 is leader: true
id: x-239027745716109360-0000000272 is leader: true
id: x-94912557367427127-0000000273 is leader: true
id: x-94912557367427128-0000000274 is leader: true
id: x-166970151413416005-0000000275 is leader: true
id: x-94912557367427129-0000000276 is leader: true
id: x-166970151413416006-0000000277 is leader: true
id: x-94912557367427130-0000000278 is leader: true
id: x-94912557367427131-0000000279 is leader: true
id: x-239027745716109361-0000000280 is leader: true
id: x-239027745716109362-0000000281 is leader: true
id: x-166970151413416007-0000000282 is leader: true
id: x-94912557367427132-0000000283 is leader: true
id: x-22854963329433652-0000000284 is leader: true
id: x-166970151413416008-0000000285 is leader: true
id: x-239027745716109363-0000000286 is leader: true
id: x-239027745716109364-0000000287 is leader: true
id: x-166970151413416009-0000000288 is leader: true
id: x-166970151413416010-0000000289 is leader: true
id: x-239027745716109365-0000000290 is leader: true
id: x-94912557367427133-0000000291 is leader: true
id: x-239027745716109366-0000000292 is leader: true
id: x-94912557367427134-0000000293 is leader: true
id: x-22854963329433653-0000000294 is leader: true
id: x-94912557367427135-0000000295 is leader: true
id: x-239027745716109367-0000000296 is leader: true
id: x-239027745716109368-0000000297 is leader: true

5 升级版

实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。

大致原理就是ReentrantLock 和 DistributedLock的一个结合:

  1. 单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
  2. 拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务

锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死

1. DistributedReentrantLock.java源码:多进程+多线程分布式锁

package com.king.lock;

import java.text.MessageFormat;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.zookeeper.KeeperException;

/**
 * 多进程+多线程分布式锁
 */
public class DistributedReentrantLock extends DistributedLock {

    private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
    private ReentrantLock reentrantLock = new ReentrantLock();

    public DistributedReentrantLock(String root) {
        super(root);
    }

    public void lock() throws Exception {
        reentrantLock.lock();//多线程竞争时,先拿到第一层锁
        super.lock();
    }

    public boolean tryLock() throws Exception {
        //多线程竞争时,先拿到第一层锁
        return reentrantLock.tryLock() && super.tryLock();
    }

    public void unlock() throws KeeperException {
        super.unlock();
        reentrantLock.unlock();//多线程竞争时,释放最外层锁
    }

    @Override
    public String getId() {
        return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
    }

    @Override
    public boolean isOwner() {
        return reentrantLock.isHeldByCurrentThread() && super.isOwner();
    }
}

2. 测试代码:

package com.king.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.zookeeper.KeeperException;

/**
 * @author taomk
 * @version 1.0
 * @since 15-11-23 下午12:15
 */
public class DistributedReentrantLockTest {

    public static void main(String [] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);

        final DistributedReentrantLock lock = new DistributedReentrantLock("/locks"); //单个锁
        for (int i = 0; i < count; i++) {
            executor.submit(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(1000);
                        lock.lock();
                        Thread.sleep(100);

                        System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                        try {
                            lock.unlock();
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

6 最后

其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。大致思路:

  1. 竞争资源标示: read_自增id , write_自增id
  2. 首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁如果队列的前边是write标识,第一个write节点获取锁
  3. watcher监听: read监听距离自己最近的一个write节点的existwrite监听距离自己最近的一个节点(read或者write节点)

出处:https://my.oschina.net/xianggao/blog/532010



【Zookeeper系列五】ZooKeeper 实时更新server列表


1 场景描述

在分布式应用中, 我们经常同时启动多个server, 调用方(client)选择其中之一发起请求.

分布式应用必须考虑高可用性和可扩展性: server的应用进程可能会崩溃, 或者server本身也可能会宕机. 当server不够时, 也有可能增加server的数量. 总而言之, server列表并非一成不变, 而是一直处于动态的增减中.

那么client如何才能实时的更新server列表呢? 解决的方案很多, 本文将讲述利用ZooKeeper的解决方案.

2 思路

启动server时, 在zookeeper的某个znode(假设为/sgroup)下创建一个子节点. 所创建的子节点的类型应该为ephemeral, 这样一来, 如果server进程崩溃, 或者server宕机, 与zookeeper连接的session就结束了, 那么其所创建的子节点会被zookeeper自动删除. 当崩溃的server恢复后, 或者新增server时, 同样需要在/sgroup节点下创建新的子节点.

对于client, 只需注册/sgroup子节点的监听, 当/sgroup下的子节点增加或减少时, zookeeper会通知client, 此时client更新server列表.

3 实现AppServer

AppServer的逻辑非常简单, 只须在启动时, 在zookeeper的"/sgroup"节点下新增一个子节点即可.

AppServer.java源码:

package com.king.server;

import org.apache.zookeeper.*;

public class AppServer {

    private String groupNode = "sgroup";

    private String subNode = "sub";

    /**
     * 连接zookeeper
     *
     * @param address server的地址
     */
    public void connectZookeeper(String address) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2180,localhost:2181,localhost:2182", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                // 不做处理
            }
        });
        // 在"/sgroup"下创建子节点
        // 子节点的类型设置为EPHEMERAL_SEQUENTIAL, 表明这是一个临时节点, 且在子节点的名称后面加上一串数字后缀
        // 将server的地址数据关联到新创建的子节点上
        String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("create: " + createdPath);
    }

    /**
     * server的工作逻辑写在这个方法中
     * 此处不做任何处理, 只让server sleep
     */
    public void handle() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 在参数中指定server的地址
        if (args.length == 0) {
            System.err.println("The first argument must be server address");
            System.exit(1);
        }

        AppServer as = new AppServer();
        as.connectZookeeper(args[0]);

        as.handle();
    }
}

4 实现AppClient

AppClient的逻辑比AppServer稍微复杂一些, 需要监听"/sgroup"下子节点的变化事件, 当事件发生时, 需要更新server列表.

注册监听"/sgroup"下子节点的变化事件, 可在getChildren方法中完成. 当zookeeper回调监听器的process方法时, 判断该事件是否是"/sgroup"下子节点的变化事件, 如果是, 则调用更新逻辑, 并再次注册该事件的监听.

AppClient.java源码:

package com.king.server;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class AppClient {

    private String groupNode = "sgroup";

    private ZooKeeper zk;

    private Stat stat = new Stat();

    private volatile List<String> serverList;

    /**
     * 连接zookeeper
     */
    public void connectZookeeper() throws Exception {
        zk = new ZooKeeper("localhost:2180,localhost:2181,localhost:2182", 5000, new Watcher() {
            public void process(WatchedEvent event) {
                // 如果发生了"/sgroup"节点下的子节点变化事件, 更新server列表, 并重新注册监听
                if (event.getType() == Event.EventType.NodeChildrenChanged
                        && ("/" + groupNode).equals(event.getPath())) {
                    try {
                        updateServerList();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        updateServerList();
    }

    /**
     * 更新server列表
     */
    private void updateServerList() throws Exception {
        List<String> newServerList = new ArrayList<String>();

        // 获取并监听groupNode的子节点变化
        // watch参数为true, 表示监听子节点变化事件. 
        // 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
        List<String> subList = zk.getChildren("/" + groupNode, true);
        for (String subNode : subList) {
            // 获取每个子节点下关联的server地址
            byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);
            newServerList.add(new String(data, "utf-8"));
        }

        // 替换server列表
        serverList = newServerList;

        System.out.println("server list updated: " + serverList);
    }

    /**
     * client的工作逻辑写在这个方法中
     * 此处不做任何处理, 只让client sleep
     */
    public void handle() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        AppClient ac = new AppClient();
        ac.connectZookeeper();

        ac.handle();
    }
}

出处:https://my.oschina.net/xianggao/blog/535682



【Zookeeper系列六】Zookeeper 工作原理


ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。Zookeeper是hadoop的一个子项目,其发展历程无需赘述。在分布式应用中,由于工程师不能很好地使用锁机制,以及基于消息的协调机制不适合在某些应用中使用,因此需要有一种可靠的、可扩展的、分布式的、可配置的协调机制来统一系统的状态。Zookeeper的目的就在于此。本文简单分析Zookeeper的工作原理,对于如何使用Zookeeper不是本文讨论的重点。

1 Zookeeper的基本概念

1.1 角色

Zookeeper中的角色主要有以下三类,如下表所示:

输入图片说明

系统模型如图所示

输入图片说明

1.2 设计目的

  1. 最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。

  2. 可靠性:具有简单、健壮、良好的性能,如果消息m被其中一台服务器接受,那么它将被所有的服务器接受

  3. 实时性Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口。

  4. 等待无关(wait-free):慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待

  5. 原子性:更新只能成功或者失败,没有中间状态。

  6. 顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。

2 ZooKeeper工作原理

Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式(选主)广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态

为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数

每个Server在工作过程中有三种状态:

  1. LOOKING:当前Server不知道leader是谁,正在搜寻

  2. LEADING:当前Server即为选举出来的leader

  3. FOLLOWING:leader已经选举出来,当前Server与之同步

2.1 选主流程

当leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的Server都恢复到一个正确的状态。Zk的选举算法有两种:一种是基于basic paxos实现的,另外一种是基于fast paxos算法实现的。系统默认的选举算法为fast paxos。

先介绍basic paxos流程:

  1. 选举线程由当前Server发起选举的线程担任,其主要功能是对投票结果进行统计,并选出推荐的Server
  2. 选举线程首先向所有Server发起一次询问(包括自己)
  3. 选举线程收到回复后,验证是否是自己发起的询问(验证zxid是否一致),然后获取对方的id(myid),并存储到当前询问对象列表中,最后获取对方提议的leader相关信息(id,zxid),并将这些信息存储到当次选举的投票记录表中
  4. 收到所有Server回复以后,就计算出zxid最大的那个Server,并将这个Server相关信息设置成下一次要投票的Server
  5. 线程将当前zxid最大的Server设置为当前Server要推荐的Leader,如果此时获胜的Server获得n/2 + 1的Server票数, 设置当前推荐的leader为获胜的Server,将根据获胜的Server相关信息设置自己的状态,否则,继续这个过程,直到leader被选举出来。

通过流程分析我们可以得出:要使Leader获得多数Server的支持,则Server总数必须是奇数2n+1,且存活的Server的数目不得少于n+1

每个Server启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的server还会从磁盘快照中恢复数据和会话信息,zk会记录事务日志并定期进行快照,方便在恢复时进行状态恢复

选主的具体流程图如下所示:

输入图片说明

fast paxos流程是在选举过程中,某Server首先向所有Server提议自己要成为leader,当其它Server收到提议以后,解决epoch和zxid的冲突,并接受对方的提议,然后向对方发送接受提议完成的消息,重复这个流程,最后一定能选举出Leader。其流程图如下所示:

输入图片说明

2.2 同步流程

选完leader以后,zk就进入状态同步过程:

  1. leader等待server连接;
  2. Follower连接leader,将最大的zxid发送给leader
  3. Leader根据follower的zxid确定同步点
  4. 完成同步后通知follower 已经成为uptodate状态;
  5. Follower收到uptodate消息后,又可以重新接受client的请求进行服务了

流程图如下所示:

输入图片说明

2.3 工作流程

2.3.1 Leader工作流程

Leader主要有三个功能:

  1. 恢复数据

  2. 维持与Learner的心跳,接收Learner请求并判断Learner的请求消息类型;

  3. Learner的消息类型主要有PING消息、REQUEST消息、ACK消息、REVALIDATE消息,根据不同的消息类型,进行不同的处理;

PING消息:是指Learner的心跳信息;

REQUEST消息:是Follower发送的提议信息,包括写请求及同步请求;

ACK消息:是Follower的对提议的回复,超过半数的Follower通过,则commit该提议;

REVALIDATE消息:是用来延长SESSION有效时间;

Leader的工作流程简图如下所示,在实际实现中,流程要比下图复杂得多,启动了三个线程来实现功能:

输入图片说明

2.3.2 Follower工作流程

Follower主要有四个功能:

  1. 向Leader发送请求(PING消息、REQUEST消息、ACK消息、REVALIDATE消息);

  2. 接收Leader消息并进行处理;

  3. 接收Client的请求,如果为写请求,发送给Leader进行投票

  4. 返回Client结果;

Follower的消息循环处理如下几种来自Leader的消息:

  1. PING消息: 心跳消息;

  2. PROPOSAL消息:Leader发起的提案,要求Follower投票;

  3. COMMIT消息:服务器端最新一次提案的信息;

  4. UPTODATE消息:表明同步完成;

  5. REVALIDATE消息:根据Leader的REVALIDATE结果,关闭待revalidate的session还是允许其接受消息;

  6. SYNC消息:返回SYNC结果到客户端,这个消息最初由客户端发起,用来强制得到最新的更新。

Follower的工作流程简图如下所示,在实际实现中,Follower是通过5个线程来实现功能的:

输入图片说明

对于observer的流程不再叙述,observer流程和Follower的唯一 不同的地方就是observer不会参加leader发起的投票

出处:https://my.oschina.net/xianggao/blog/535891



技术交流请入群,进群说明为何添加?


大宽宽的技术交流群
上一篇 下一篇

猜你喜欢

热点阅读