大数据 Zookeeper 介绍与简单应用
-
Zookeeper 概述
-
Zookeeper 是一个分布式协调服务的开源框架。主要用来解决分布式集群中应用系统的一致性问题。
-
Zookeeper 本质上是一个分布式的小文件存储系统。提供基于类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。从而用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。
-
-
Zookeeper 特性
-
全局数据一致:集群中每个服务器保存一份相同的数据副本,client 无论连接到哪个服务器,展示的数据都是一致的,这是最重要的特征;
-
可靠性:如果消息被其中一台服务器接受,那么将被所有的服务器接受。
-
顺序性:包括全局有序和偏序两种
-
全局有序是指如果在一台服务器上消息 a 在消息 b 前发布,则在所有 Server 上消息 a 都将在消息 b 前被发布;
-
偏序是指如果一个消息 b 在消息 a 后被同一个发送者发布,a 必将排在 b 前面。
-
-
数据更新原子性:一次数据更新要么成功(半数以上节点成功),要么失败,不存在中间状态;
-
实时性:Zookeeper 保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。
-
-
Zookeeper 集群角色
-
Leader
-
Zookeeper 集群工作的核心
-
事务请求(写操作)的唯一调度和处理者,保证集群事务处理的顺序性;
-
集群内部各个服务器的调度者。
-
对于 create、setData、delete 等有写操作的请求,则需要统一转发给 leader 处理,leader 需要决定编号、执行操作,这个过程称为一个事务。
-
-
Follower
-
处理客户端非事务(即读操作)请求,转发事务请求给 Leader;
-
参与集群 Leader 选举投票。
-
此外,针对访问量比较大的 zookeeper 集群,还可新增观察者角色。
-
-
Observer:
-
观察者角色,观察 Zookeeper 集群的最新状态变化并将这些状态同步过来,其对于非事务请求可以进行独立处理;对于事务请求,则会转发给 Leader 服务器进行处理。
-
不会参与任何形式的投票只提供非事务服务,通常用于在不影响集群事务处理能力的前提下提升集群的非事务处理能力。
-
-
-
Zookeeper 集群搭建
-
Zookeeper 集群搭建指的是 Zookeeper 分布式模式安装。通常由 2n+1 台 server 组成。这是因为为了保证 Leader 选举(基于Paxos 算法的实现)能够得到多数的支持,所以 Zookeeper 集群的数量一般为奇数。
-
Zookeeper 运行需要 java 环境,所以需要提前安装 jdk。对于安装 leader + follower 模式的集群,大致过程如下:
-
配置主机名称到 IP 地址映射配置;
-
修改 Zookeeper 配置文件;
-
远程复制分发安装文件;
-
设置 myid
-
启动 Zookeeper 集群;
-
-
如果要想使用 Observer 模式,可在对应节点的配置文件(zookeeper-3.4.6/conf/zoo.cfg)添加如下配置:
- peerType=observer
其次,必须在配置文件指定哪些节点被指定为 Observer,如:server.1:localhost:2181:3181:observer
-
这里,我们安装的是 leader + follower 模式
服务器IP 主机名 myid 192.168.88.161 node1 1 192.168.88.162 node2 2 192.168.88.163 node3 3 -
第一步:下载 zookeeper 的压缩包,官方下载网址如下:http://archive.apache.org/dist/zookeeper/
我们在这个网址下载我们使用的 zk 版本为 3.4.6
下载完成之后,上传到我们的 linux 服务器的 /export/software 路径下准备进行安装
-
第二步:解压
在 node1 主机上,解压 zookeeper 的压缩包到 /export/server 路径下去,然后准备进行安装
cd /export/software tar -zxvf zookeeper-3.4.6.tar.gz -C /export/server/
-
第三步:修改配置文件
在 node1 主机上,修改配置文件
cd /export/server/zookeeper-3.4.6/conf/ cp zoo_sample.cfg zoo.cfg mkdir -p /export/server/zookeeper-3.4.6/zkdatas/ vim zoo.cfg
修改以下内容:
#Zookeeper的数据存放目录 dataDir=/export/server/zookeeper-3.4.6/zkdatas #保留多少个快照 autopurge.snapRetainCount=3 #日志多少小时清理一次 autopurge.purgeInterval=1 # 集群中服务器地址 server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888
-
第四步:添加 myid 配置
在 node1 主机的 /export/server/zookeeper-3.4.6/zkdatas/ 这个路径下创建一个文件,文件名为 myid,文件内容为1
echo 1 > /export/server/zookeeper-3.4.6/zkdatas/myid
-
第五步:安装包分发并修改 myid 的值
在 node1 主机上,将安装包分发到其他机器
第一台机器上面执行以下两个命令
scp -r /export/server/zookeeper-3.4.6/ node2:/export/server/ scp -r /export/server/zookeeper-3.4.6/ node3:/export/server/
第二台机器上修改 myid 的值为 2
echo 2 > /export/server/zookeeper-3.4.6/zkdatas/myid
第三台机器上修改 myid 的值为 3
echo 3 > /export/server/zookeeper-3.4.6/zkdatas/myid
-
第六步:配置环境变量
分别在三台机器中,修改 /etc/profile,添加以下内容:
export ZOOKEEPER_HOME=/export/server/zookeeper-3.4.6 export PATH=:$ZOOKEEPER_HOME/bin:$PATH
分别在三台机器中,source /etc/profile
-
第七步:三台机器启动 zookeeper 服务
三台机器分别启动 zookeeper 服务
这个命令三台机器都要执行
/export/server/zookeeper-3.4.6/bin/zkServer.sh start
三台主机分别查看启动状态
/export/server/zookeeper-3.4.6/bin/zkServer.sh status
-
-
-
Zookeeper 数据模型
Zookeeper 的数据模型,在结构上和标准文件系统的非常相似,拥有一个层次的命名空间,都是采用树形层次结构,Zookeeper 树中的每个节点被称为 Znode。和文件系统的目录树一样,Zookeeper 树中的每个节点可以拥有子点点。但也有不同之处:
-
Znode 兼具文件和目录两种特点,既像文件一样维护着数据、元信息、ACL(访问控制列表)、时间戳等数据结构,又像目录一样可以作为路径标识的一部分,并可以具有子 Znode。用户对 Znode 具有增、删、改、查等操作(权限允许的情况下)。
-
Znode 具有原子性操作,读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。另外,每一个节点都拥有自己的 ACL(访问控制列表),这个列表规定了用户的权限,即限定了特定用户对目标节点可以执行的操作。
-
Znode 存储数据大小有限制,Znode 虽然可以关联一些数据,但并没有被设计为常规的数据库或者大数据存储,相反的是,它用来管理调度数据,比如分布式应用中的配置文件信息,状态信息、汇集位置等等。这些数据的共同特性就是它们都是很小的数据,通常以 KB 为大小单位。Zookeeper 的服务器和客户端都被设计为严格检查并限制每个 Znode 的数据大小至多1M,当然常规使用中应该远小于此值。
-
Znode 通过路径引用,如同 Unix 中的文件路径。路径必须是绝对的,因此他们必须由斜杠字符来开头。除此以外,他们必须是唯一的。也就是说,每一个路径只有一个表示,因此这些路径不能改变。在 Zookeeper 中,路径由 Unicode 字符串组成,并且有一些限制。字符串'/zookeeper'用以保存管理信息,比如关键配额信息。
-
stat : 此为状态信息,描述该 Znode 的版本,权限等信息;
-
data :与该 Znode 关联的数据
-
children :该 Znode 下的子节点
-
-
-
Zookeeper 节点类型
Znode 有两种,分别为临时节点和永久节点。
-
节点的类型在创建时即被确定,并且不能改变。
-
临时节点:该节点的生命周期依赖于它们的会话。一旦会话结束,临时节点将被自动删除,当然也可以手动删除。临时节点不允许拥有子节点
-
永久节点:该节点的生命周期不依赖会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。
Znode 还有一个序列化的特性,如果创建的时候指定的话,该 Znode 的名字后面会自动追加一个不断增加的序列号。序列号对于此节点的父节点来说是唯一的,这样便于记录每个子节点创建的先后顺序。它的格式为"%10d"(10位数字,没有数值的数位用0补充,例如“0000000001”)。
这样,便会存在四种类型的 Znode 节点,分别对应:
PERSISTENT:永久节点;
EPHEMERAL:临时节点;
PERSISTENT_SEQUENTIAL:永久节点,序列化(顺序)
EPHEMERAL_SEQUENTIAL:临时节点,序列化(顺序)
-
-
Zookeeper 的 shell 操作
-
客户端连接
运行 zkCli.sh -server ip 进入命令行工具。
zkCli.sh -server node1:2181
-
操作命令
命令 说明 参数 create [-s] [-e] path data [acl] 创建 Znode -s 指定是顺序节点;-e 指定临时节点 ls path [watch] 列出 Path 下所有子 Znode get path [watch] 获取 Path 对应的 Znode 的数据和属性 ls2 path [watch] 查看 Path 下所有子 Znode 以及子 Znode 的属性 set path data [version] 更新节点 version 数据版本 delete path [version] 删除节点,如果要删除的节点有子 Znode 则无法删除 version 数据版本 rmr path 删除节点,如果有子 Znode 则递归删除 setquota -n -b val path 修改 Znode 配置 -n 设置子节点最大个数 -b 数据最大长度 history 列出历史记录 -
操作实例
-
创建普通永久节点
create /app1 hello
输出结果如下:
Created /app1
-
创建永久顺序节点
create -s /app2 world
输出结果如下:
Created /app20000000001
-
创建临时节点
create -e /tempnode world
输出结果如下:
Created /tempnode
-
创建顺序的临时节点
create -s -e /tempnode2 aaa
输出结果如下:
Created /tempnode20000000003
-
获取节点数据
get /app1
输出结果如下:
hello
cZxid = 0x100000002
ctime = Mon Apr 24 05:11:24 EDT 2023
mZxid = 0x100000002
mtime = Mon Apr 24 05:11:24 EDT 2023
pZxid = 0x100000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0 -
修改节点数据
set /app1 hadoop
输出结果如下:
cZxid = 0x100000002
ctime = Mon Apr 24 05:11:24 EDT 2023
mZxid = 0x100000006
mtime = Mon Apr 24 05:14:42 EDT 2023
pZxid = 0x100000002
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0 -
删除节点
delete /app1 #删除的节点不能有子节点 rmr /app1
-
-
节点属性
每个 Znode 都包含了一系列的属性,通过命令 get,可以获得节点的属性。
如:
hello cZxid = 0x100000002 ctime = Mon Apr 24 05:11:24 EDT 2023 mZxid = 0x100000002 mtime = Mon Apr 24 05:11:24 EDT 2023 pZxid = 0x100000002 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0
-
dataVersion:数据版本号,每次对节点进行 set 操作,dataVersion 的值都会增加1(即使设置的是相同数据),可有效避免了数据更新时出现的先后顺序问题。
-
cversion:子节点的版本号。当 Znode 的子节点有变化时,cversion 的值就会增加1.
-
cZxid:Znode 创建的事务 id。
-
mZxid:Znode 被修改的事务 id,即每次对 Znode 的修改都会更新 mZxid。
对于 zk 来说,每次的变化都会产生一个唯一的事务id,zxid(Zookeeper Transaction id)。通过 zxid,可以确定更新操作的先后顺序。例如,如果 zxid1 小于 zxid2,说明 zxid1 操作先于 zxid2 发生。zxid 对于整个 zk 都是唯一的,即使操作的是不同的 znode。
-
ctime:节点创建时的时间戳;
-
mtime:节点最新一次更新发生时的时间戳;
-
ephemralOwner:如果该节点为临时节点,ephemeralOwner 的值表示与该节点绑定的 session id。如果不是,ephemeralOwner 的值为0;
在 client 和 server 通信之前,首先需要建立连接,该连接称为 session。连接建立后,如果发生连接超时、授权失败,或者显式关闭连接,连接便处理 Closed 状态,此时 session 结束。
-
-
-
Zookeeper Watch(监听机制)
Zookeeper 提供了分布式数据发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某一个主题对象。当这个主题对象自身状态变化时,会通知所有订阅者,使他们能够做出相应的处理。
Zookeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。Zookeeper 允许客户端向服务器注册一个 Watcher 监听。当服务端的一些事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式通知功能。
触发事件种类很多,如:节点创建、节点删除、节点改变、子节点改变等。
总的来说可以概括 Watcher 为以下三个过程:客户端向服务器注册 Watcher、服务端事件发生触发Watcher、客户端回调 Watcher 得到触发事件情况。
-
Watch 机制特点
-
一次性触发
事件发生触发监听,一个 watcher event 就会被发送到设置监听的客户端,这种效果是一次性的,后续再次发生同样的事件,不会再次触发。
-
事件封装
Zookeeper 使用 WatchedEvent 对象来封装服务端事件并传递。
WatchedEvent 包含了每一个事件的三个基本属性:通知状态(keeperState)、事件类型(EventType) 和节点路径(path)
-
event 异步发送
watcher 的通知事件从服务端发送到客户端是异步的。
-
先注册再触发
Zookeeper 中的 watch机制,必须客户端先去服务端注册监听,这样事件发送才会触发监听,通知客户端
-
-
通知状态和事件类型
同一个事件类型在不同的通知状态中代表的含义有所不同,下有列举了常见的通知状态和事件类型。
事件封装:Wather 得到的事件是被封装过的,包括三个内容:keeperState、eventType、path
KeeperState EventType 触发条件 说明 None 连接成功 SyncConnected NodeCreated Znode 被创建 此时处于连接状态 SyncConnected NodeDeleted Znode 被删除 此时处于连接状态 SyncConnected NodeDataChanged Znode 数据被改变 此时处于连接状态 SyncConnected NodeChildChanged Znode 的子Znode 数据被改变 此时处于连接状态 Disconnected None 客户端和服务器端断开连接 客户端和服务器处于断开连接状态 Expired None 会话超时 会收到一个 SessionExpired AuthFail None 权限验证失败 会收到一个 AuthFailedException 其中连接状态事件(type=None,path=null)不需要客户端注册,客户端只要有需要直接处理就行了。
-
-
Shell 客户端设置 watcher
-
设置节点数据变动监听
[zk: node1:2181(CONNECTED) 14] get /app10000000005 watch
输出结果如下:
hello
cZxid = 0x10000000a
ctime = Mon Apr 24 05:52:33 EDT 2023
mZxid = 0x10000000a
mtime = Mon Apr 24 05:52:33 EDT 2023
pZxid = 0x10000000a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0 -
通过另一个客户端更改节点数据
[zk: node1:2181(CONNECTED) 15] set /app10000000005 hello22
输出结果如下:
WATCHER::cZxid = 0x10000000a
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app10000000005
ctime = Mon Apr 24 05:52:33 EDT 2023
mZxid = 0x10000000b
mtime = Mon Apr 24 05:53:48 EDT 2023
pZxid = 0x10000000a
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0此时设置监听的节点收到通知
-
-
Zookeeper Java API 操作
这里操作 Zookeeper 的 JavaAPI 使用的是一套 Zookeeper 框架 Curator,解决了很多 Zookeeper 客户端非常底层的细节开发工作。
Curator 包含了几个包:
- curator-framework:对 zookeeper 的底层 api 的一些封装
- curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器等
Maven 依赖(使用 curator 版本:2.12.0,对应 Zookeeper 的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能会导致节点操作失败)
-
引入 Maven 依赖:
<dependencies> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>com.google.collections</groupId> <artifactId>google-collections</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
-
节点操作
-
ZnodeOperator.java 文件的代码如下:
package Znode; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class ZnodeOperator { public void createZnode() throws Exception { //1. 定制一个重试策略 /** * param1:重试的间隔时间 * param2:重试的最大次数 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); //2. 获取一个客户端对象 /** * param1:要连接的 Zookeeper 服务器列表 * param2:会话的超时时间 * param3:链接超时时间 * param4:重试策略 */ String connectionStr = "192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181"; CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy); //3.开户客户端 client.start(); //4.创建节点 /** * 节点类型: * CreateMode.PERSISTANT:永久节点 * CreateMode.PERSISTANT_SEQUENTIAL:永久序列化节点 * CreateMode.EPHEMERAL:临时节点 * CreateMode.EPHEMERAL_SEQUENTIAL:临时序列化节点 */ client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello3","world".getBytes()); //5.关闭客户端 client.close(); } }
-
Test.java 测试类的代码如下:
package Znode; public class Test { public static void main(String[] args) throws Exception { ZnodeOperator operator = new ZnodeOperator(); operator.createZnode(); } }
-
输出结果如下:
......
[main-SendThread(192.168.88.163:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.88.163/192.168.88.163:2181, initiating session
[main-SendThread(192.168.88.163:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.88.163/192.168.88.163:2181, sessionid = 0x387b220a2240000, negotiated timeout = 8000
[main-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
[Curator-Framework-0] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x387b220a2240000 closedProcess finished with exit code 0
-
我们再通过 Zookeeper 的 shell 操作, 在 Linux 上 shell 窗口上通过 ls path 的方式,能成功验证 znode 确实已经被我们创建成功。如下所示:
WatchedEvent state:SyncConnected type:None path:null [zk: node1:2181(CONNECTED) 0] ls /hello2 []
-
-
Zookeeper 选举机制
Zookeeper 默认的算法是 FastLeaderElection,采用投票数大于半数则胜出的逻辑
-
概念
-
服务器ID
比如有三台服务器,编号分别为1,2,3
编号越大,在选择算法中的权重越大
-
选举状态
LOOKING,竞选状态;
FOLLOWING,随从状态,同步 leader 状态,参与投票。
OBSERVING, 观察状态,同步 leader 状态,不参与投票。
LEADING,领导者状态。
-
数据 ID
服务器中存放的最新数据 version。
值越大说明数据越新,在选举算法中数据越新权重越大。
-
逻辑时钟
也叫投票的次数,同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断。
-
-
全新集群选举
假设目前有5台服务器,每台服务器均没有数据,它们的编号分别是1、2、3、4、5,按编号依次启动,它们的选举过程如下:
- 服务器1启动,给自己投票,然后发投票信息。由于其它机器还没有启动,所以它收不到反馈信息,服务器1的状态一直属于 Looking。
- 服务器2启动,给自己投票,同时与之前启动的服务器1交换结果。由于服务器2的编号大,所以服务器2胜出。但此时投票数没有大于半数,所以两个服务器的状态依然是 Looking。
- 服务器3启动,给自己投票,同时与之前启动的服务器1、2交换结果。由于服务器3的编号最大,所以服务器3胜出。此时投票数正好大于半数,所以服务器3成为领导者,服务器1、2 成为小弟。
- 服务器4启动,给自己投票,同时与之前启动的服务器1、2、3交换信息,尽管服务器4的编号大,但之前的服务器3已经胜出,所以服务器4只能成为小弟。
- 服务器5启动,后面的逻辑同服务器4成为小弟。
-
非全新集群选举
对于运行正常的 Zookeeper 集群,中途有机器down 掉,需要重新选举时,选举过程就需要加入数据ID、服务器ID 和逻辑时钟。
数据ID:数据新的 version 就大,数据每次更新都会更新 version 。
服务器 ID:就是我们配置的 myid 中的值,每个机器一个。
逻辑时钟:这个值从0开始递增,每次选举对应一个值。如果在同一次选举中,这个值是一致的。
这样选举的标准就变成:
- 逻辑时钟小的选举结果被忽略,重新投票;
- 统一逻辑时钟后,数据 id 大的胜出;
- 数据 id 相同的情况下,服务器 id 大的胜出。
根据这个规则选出 Leader。
-