BigData~02:Hadoop基础01 ~ ZooKeepe
2018-07-02 本文已影响10人
帅可儿妞
...
一、相关概念
- 中间件:为分布式系统提供协调服务的组件,如专门用于计算服务的机器就是一个计算型中间件,还有专门用于存储的机器就是存储中间件等等;
- 分布式系统:把一套系统按照业务进行横向拆分,不同业务阶段的处理放在不同的计算机上,由这些计算机组成了一套业务处理链,那么这个业务处理链就是一个分布式系统;
- 内部的每台计算机都可以进行通信(rpc/rest);
- 可以这样理解:加入我们的系统目前就在一个项目中,横向拆分就是把MVC层拆出来作为一个模块节点A,把Service层拆分出来当做节点B,把DAO层拆分出来当做节点C,那么节点A可以部署在服务器a上,节点B部署在服务器b上,节点C部署在节点c上;运行的时候客户端的请求发送到服务器a上,a处理的时候使用rpc/rest调用b,b处理的时候调用c,这就是所谓的分布式系统;而且这个系统的特点很明显,如果b的处理压力过大,那么b节点可以单独的扩展为集群,这样这个系统就成为了一个分布式集群系统,大大增加了系统的可扩展性,以及系统运行时候的性能的保障;
- ZooKeeper:一个基于观察者模式设计的,为用户的分布式应用程序提供分布式的协调服务的中间件框架;简称ZK;主要用来解决分布式集群中应用系统的一致性问题;支持Java,并且提供java和c的相关API;
- ZK的作用
- Leader节点选举:就是一个主备切换的过程,当主节点挂了以后,从节点就可以接手主节点的工作,保证系统的高可用。
- 统一配置管理:只需要配置部署一台服务器,就可以把这个服务器上的配置资源更新到其他的服务器上。常用于云计算部署;
- 发布订阅: 类似于MQ、Dubbo,发布者把数据发布到znode上,订阅者会去获取这个信息;
- 提供分布式锁:在分布式环境中,各个服务器可能会抢夺同一个资源,分布式锁就是解决这个抢夺过程的同步问题;
- 集群管理:集群中各个服务器之间保持数据的强一致性;
- ZK中的节点角色分两种:Leader和Follower/Observer,只有集群中有半数以上节点存活,集群就能提供服务,一般情况下集群的节点数目不少于3个且为奇数个;一个Leader,多个Follower组成一个ZK集群;
二、ZooKeeper单机安装配置
- 下载安装
- 建议在官方归档页面下载;
- 解压下载到的安装包,配置环境变量
ZOOKEEPER_HOME
和PATH
;
- ZooKeeper的配置
- 安装完成后,在conf文件夹下面有一个文件:zoo_sample.cfg文件,拷贝这个文件为zoo.cfg;
- 在这个配置文件中有很多的常用的变量需要我们配置:
- tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,单位:毫秒,单机集群通用配置;
- initLimit:初始化时,Follower节点连接到Leader节点的最长能忍受的心跳间隔数,按照tickTime倍数形式表示,集群配置;
- syncLimit:Follower节点和Leader节点之间发送请求与应答最多能忍受的心跳间隔,集群配置;
- dataDir:常用数据所在目录,必须配置,单机集群通用配置;
- dataLogDir:日志目录,如果没有配置,则使用dataDir,单机集群通用配置;
- clientPort:连接服务器的端口,默认2181,单机集群通用配置;
- ZooKeeper的使用
- 启动ZooKeeper服务:
./zkServer.sh start ./zkServer.sh start-foreground
- 关闭ZooKeeper服务:
./zkServer.sh stop
- 重启ZooKeeper服务:
./zkServer.sh restart
- 查看ZooKeeper服务状态:
./zkServer.sh status
- 启动ZooKeeper服务:
三、ZooKeeper的数据模型
- ZK的数据模型
- 是一个树形模型;
- 每一个节点成为znode,每个节点都存储自身的数据,可以有子节点;
- 每个节点都有两种类型:临时和永久;临时节点在与客户端断开连接之后消失,子节点和自身数据都会丢失;
- 每个节点都有一个版本号,该版本号会随着该节点的信息更新而更新(类似数据库的乐观锁),这个版本信息可以通过命令来展示;
- 每个节点存储的数据大小不宜过大(几K);
- 每个节点都可以设置ACL权限;
- ZK的数据模型的相关操作
- 连接客户端,必须先启动Server,再去使用Client连接Server
./zkCli.sh
- zkCli相关命令
- 查看zkCli的所有命令
help
- 列举当前节点的子节点
ls <node-path> [watch]
- 查看当前节点的状态
stat <node-path>
- cZxid:节点id;
- cTime:create time;
- mZXid:修改节点后的节点id;
- mTime:modify time;
- pZxid:子节点的id???
- cversion:子节点的version;
- dataVersion:当前节点的数据版本;
- aclVersion:acl权限版本;
- ephemeralOwner:临时Owner内存地址,如果值是
0x0
,那么这个节点就是持久性节点,否则这个节点就是临时节点;临时节点不是一断开就立即删除(心跳机制,断开连接之后,没有了心跳,就会去删除临时节点),而是有一个实效时间,在这个时间过后才会删除; - dataLength:数据字节数;
- numChildren:子节点个数;
- 同事查看当前节点的子节点以及当前节点的状态
ls2 <node-path> [watch] # 本质上就是ls和stat组合命令
- 查看当前节点的数据
get <node-path>
- 查看zkCli的所有命令
- 关闭客户端连接
- Ctrl + C
- 连接客户端,必须先启动Server,再去使用Client连接Server
四、ZK的常用操作
- Session基本原理
- 服务端和客户端之间的连接称之为Session;
- 每个Session都可以设置一个超市时间;
- 心跳结束,Session过期,临时节点就会被丢弃;
- 心跳机制:客户端向服务端的ping包请求;
- 常用命令
- 创建节点
create [-s] [-e] <node-path> <node-data> <acl>
-
-s
:sequence,创建顺序节点 -
-e
:ephemeral,创建临时节点;临时节点上不可以创建永久节点;
-
- 修改节点
set <node-path> <node-data> [<version>]
- 删除节点
delete <node-path> [<version>]
- 创建节点
五、watch机制
- 机制简介
- 我们对每个节点的操作都会有一个监听者watch,可以理解为一个触发器;当有人操作了节点,那么就会触发这个节点的watch事件,来更新这个节点以及父节点的属性值;
- watch是一次性的,触发后事件完成后就立即销毁;可以通过Apache实现永久性的watch;
- 不同的类型的watch触发的事件也是不同的:
- 节点创建
- 节点删除
- 节点数据更新
- 常见的watch事件
- 当前节点创建节点触发事件:NodeCreated
- 如果当前节点不存在也可以设置,报错可忽略
- 当前节点数据更新事件:NodeDataChanged
- 当前节点删除事件:NodeDeleted
- 子节点创建、删除事件:NodeChildrenChanged
- 修改子节点,不触发父节点事件
- 当前节点创建节点触发事件:NodeCreated
- watch相关的命令
- 通过查询数据设置watch
get <node-path> [watch]
- 通过查询节点状态设置watch
stat <node-path> [watch]
- 通过列举子节点设置watch
ls[2] <node-path> [watch]
- 通过查询数据设置watch
- Watch使用场景
- 集群统一资源配置
六、权限控制列表
- ACL简介
- 权限控制列表:ACL(Access Control List)
- 为了保障数据的安全性,针对节点设置相关的读写权限;这个权限可以设置不同的权限范围和角色;
- ACL的结构单位:
[<scheme>:<id>:<permissions>]
:- scheme:采用的权限机制
- world:这个scheme下只有一个id,即anyone;组合
world:anyone:<permissions>
; - auth:认证登录;组合:
auth:<username>:<password>:<permissions>
; - digest:加密访问;组合
digest:<username>:BASE64(SHA1(<password>)):<persissions>
; - ip:限制ip访问;组合
ip:<ip-addr>:<permissions>
; - super:超级管理员权限;
- 编辑zkServer.sh中,找到行nohup行的${ZOO_LOG4J_PROP}",在其后添加
"-Dzookeeper.DigestAuthenticationProvider.superDigest=<username>:<BASE64(SHA1(<password>))>"
- 重启zkServer;
- 登陆认证:
addauth digest <username>:<BASE64(SHA1(<password>))>
,这样就可以使用超级管理员账号了;
- 编辑zkServer.sh中,找到行nohup行的${ZOO_LOG4J_PROP}",在其后添加
- world:这个scheme下只有一个id,即anyone;组合
- id:允许访问的用户id
- permissions:权限字符串:CRDWA,这5个字母可以任意组合
- Create:创建权限
- Read:读当前节点和子节点的权限
- Delete:删除权限
- Write:写权限
- Admin:分配权限的权限
- scheme:采用的权限机制
- ACL使用
- addauth:添加认证授权信息:
addauth <scheme> <acl> # 添加一个用户名为zhangsan,密码为123456的用户:认证过程 addauth digest zhangsan:123456
- setAcl:设置具体节点的权限信息:
setAcl <node-path> <acl> # 给一个用户zhangsan设置权限,注意:只是第一次设置有效 setAcl /level1/level2 auth:zhangsan:123456:cdrwa
- getAcl:获取具体节点的权限信息:
getAcl <node-path> # 默认权限: 'world,' anyone : cdrwa
- addauth:添加认证授权信息:
- ACL使用场景
- 分离开发测试环境:即不同的节点设置不同的访问权限;
七、四字命令(The Four Letter Words)
- 简介
- ZK可以通过四字命令与服务器进行交互;比较适合监控信息;
- 使用
- 安装nc:
yum install nc
- nc命令使用规则:
echo [command] | nc [ip] [port]
- 安装nc:
- 常用命令
- stat:查看ZK的状态信息;
echo stat | nc localhost 2181 # 执行结果: Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT Clients: /localhost:46026[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/1/8 Received: 10 Sent: 9 Connections: 1 Outstanding: 0 Zxid: 0xa Mode: standalone Node count: 4
- ruok:查看ZK的启动状态;
echo ruok | nc localhost 2181 # 执行结果: imok
- dump:查看ZK的启动状态;
echo dump | nc localhost 2181 # 执行结果: SessionTracker dump: Session Sets (0): ephemeral nodes dump: Sessions with Ephemerals (0):
- conf:查看ZK服务器的相关配置参数;
echo conf | nc localhost 2181 # 执行结果: clientPort=2181 dataDir=/usr/local/bin/zookeeper-3.4.9/data/version-2 dataLogDir=/usr/local/bin/zookeeper-3.4.9/log/version-2 tickTime=2000 maxClientCnxns=60 minSessionTimeout=4000 maxSessionTimeout=40000 serverId=0
- cons:查看连接到ZK服务器的信息;
echo cons | nc localhost 2181 # 执行结果: /0:0:0:0:0:0:0:1:49998[0](queued=0,recved=1,sent=0) /0:0:0:0:0:0:0:1:49996[1](queued=0,recved=1,sent=1,sid=0x1645e34b63f0001,lop=SESS,est=1530597540388,to=30000,lcxid=0x0,lzxid=0xb,lresp=1530597540403,llat=14,minlat=0,avglat=14,maxlat=14)
- envi:查看ZK服务器的环境变量;
echo envi | nc localhost 2181 # 执行结果: Environment: zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT host.name=localhost java.version=1.8.0_171 java.vendor=Oracle Corporation java.home=/usr/local/bin/jdk1.8.0_171/jre java.class.path=/usr/local/bin/zookeeper-3.4.9/bin/../build/classes:/usr/local/bin/zookeeper-3.4.9/bin/../build/lib/*.jar:/usr/local/bin/zookeeper-3.4.9/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/bin/zookeeper-3.4.9/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/bin/zookeeper-3.4.9/bin/../lib/netty-3.10.5.Final.jar:/usr/local/bin/zookeeper-3.4.9/bin/../lib/log4j-1.2.16.jar:/usr/local/bin/zookeeper-3.4.9/bin/../lib/jline-0.9.94.jar:/usr/local/bin/zookeeper-3.4.9/bin/../zookeeper-3.4.9.jar:/usr/local/bin/zookeeper-3.4.9/bin/../src/java/lib/*.jar:/usr/local/bin/zookeeper-3.4.9/bin/../conf: java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib java.io.tmpdir=/tmp java.compiler=<NA> os.name=Linux os.arch=amd64 os.version=3.10.0-862.3.2.el7.x86_64 user.name=root user.home=/root user.dir=/usr/local/bin/zookeeper-3.4.9/bin
- mntr:查看ZK服务器的健康信息;
echo mntr | nc localhost 2181 # 执行结果: zk_version 3.4.9-1757313, built on 08/23/2016 06:50 GMT zk_avg_latency 1 zk_max_latency 14 zk_min_latency 0 zk_packets_received 59 zk_packets_sent 58 zk_num_alive_connections 2 zk_outstanding_requests 0 zk_server_state standalone zk_znode_count 4 zk_watch_count 0 zk_ephemerals_count 0 zk_approximate_data_size 27 zk_open_file_descriptor_count 27 zk_max_file_descriptor_count 4096
- wchs:查看ZK服务器的watch信息;
echo wchs | nc localhost 2181 # 执行结果: 0 connections watching 0 paths Total watches:0
- wchc:查看ZK服务器中session与带有watch的节点的关系信息(3.4.10后需要开启白名单才能执行);
echo wchc | nc localhost 2181
- wchp:查看ZK服务器中带有watch的节点在哪个session中(3.4.10后需要开启白名单才能执行);
echo wchp | nc localhost 2181
- 开启白名单:在zoo.cfg文件中添加如下一行命令,重启即可:
4lw.commands.whitelist=* 4lw.commands.whitelist=<cmd1>,<cmd2>...
- stat:查看ZK的状态信息;
八、ZK搭建集群
-
伪分布式环境搭建(为学习)
- 下载ZooKeeper安装包;
- 解压安装包;
- 复制conf文件夹下的zoo_sample.cfg为zoo.cfg;
- 配置各个节点的myid:向每个节点的ZK的数据文件夹中写入一个文件
myid
,这个文件中写入当前机器的id编号即可; - 编辑zoo.cfg:
- 配置
dataDir
和dataLogDir
,不要放置在tmp下,个人喜欢放在安装目录或者独立的硬盘的一个目录下,并且确保程序有访问权限; - 在每个节点上配置所有节点的端口映射关系:①对外访问端口:2181;②Leader和Follower之间的端口:2888;③组件之间投票通讯端口:3888;对应的配置如下:
<server-name-prefix>.<myid>=<host>:<leader-listener-port>:<vote-port>
- <leader-listener-port>:是该服务器一旦成为Leader之后需要监听的端口,用于接收来自follower的请求;
- <vote-port>:集群中的每一个节点在最开始选举Leader时监听的端口,用于服务器互相之间通信选举Leader;
- 配置
- 建议关闭防火墙,因为集群只在内部访问,不会暴露到对外的环境中;
-
真分布式环境搭建(为应用)
九、ZK集群的特点
- 数据一致性:每个ZK节点保存一份相同的数据副本,client无论连接到哪个节点,数据都是一致的,即
单一视图
; - 分布式读写,更新请求转发,由leader实施;更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行;
- 操作原子性:一损俱损,全荣为荣;
- 数据实时性:在一定时间范围内,client能读到最新数据,ZK节点越多实时性越差;
- 数据可靠性:每次对ZK的操作都记录在服务端;
十、ZK选举Leader
- 问题的提出:
- 在配置集群的时候我么是不配置哪个节点是Leader节点,哪些事Follower节点的,当我们在每个节点上配置了这个集群的所有节点的映射关系之后,当ZK节点一个接着一个的启动,系统会自动的选举出Leader,那这个Leader的节点选举时怎样进行的呢?
- Leader选举机制
- Leader选举机制的考虑因素:
- 节点id:id即权重,节点id越大选举的权重越大;
- 数据id:数据越新,数据id越大,选举的权重越大;
- 投票次数:得票者越大,越容易成为Leader;
- 选举中节点的状态:
- LOOKING:竞选状态;
- FOLLOWING:随从状态,同步leader状态,参与投票;
- OBSERVING:观察状态,同步leader状态,不参与投票;
- LEADING:领导者状态;
- Leader选举机制的考虑因素:
十一、ZK的JavaAPI的使用
- 添加ZooKeeper的pom:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> </dependency>
-
org.apache.zookeeper.Zookeeper
是客户端入口主类,负责建立与ZK服务器节点的会话;主要的API参考下面的测试类;
-
- ZKClientTest:
import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.util.List; /** * @Author: ShrekerNil * @Date: 2018/7/5 10:28 * @Description: ZooKeeper的API的使用 */ @Slf4j public class ZKClientTest { private static final String connectString = "192.168.70.131:2181,192.168.70.132:2181,192.168.70.133:2181"; private static final int sessionTimeout = 2000; private ZooKeeper zkClient = null; @Before public void init() throws Exception { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { // Watcher接口是当zkClient监听到watch事件触发的时候就会去调用该接口process方法的一个接口 public void process(WatchedEvent event) { // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) log.info("接收到事件,类型:{},事件发生节点:{}", event.getType(), event.getPath()); // 因为监听器只会生效一次,所以在消费监听器之后再次注册监听器 try { zkClient.getChildren("/", true); } catch (Exception e) { log.error("添加监听器失败", e); } } }); } // 创建数据节点到ZK中 @Test public void testCreate() throws Exception { // 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型 String nodePathCreated = zkClient.create("/node110", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //上传的数据可以是任何类型,但都要转成byte[] log.info(nodePathCreated); } //判断znode是否存在 @Test public void testExist() throws Exception { Stat stat = zkClient.exists("/node110", false); log.info(stat == null ? "Not exist!" : "Exists"); } // 获取子节点 @Test public void getChildren() throws Exception { List<String> children = zkClient.getChildren("/", true); for (String child : children) { log.info(child); } Thread.sleep(20000); } //获取znode的数据 @Test public void getNodeData() throws Exception { byte[] data = zkClient.getData("/node110", false, null); log.info(new String(data)); } //设置znode数据 @Test public void setNodeData() throws Exception { zkClient.setData("/node110", "something in my heart is ...".getBytes(), -1); byte[] data = zkClient.getData("/node110", false, null); log.info(new String(data)); } //删除znode @Test public void deleteZnode() throws Exception { //第二个参数:指定要删除的版本,-1表示删除所有版本 zkClient.delete("/node110", -1); } }
十二、ZK的使用场景(参考自这里)
- 统一命名服务(Name Service)
- 统一命名服务是ZK的内置功能,类似于JNDI,我们在调用其API的时候使用了这一功能,在这里不再赘述;
- 配置管理(Configuration Management)
- 问题:配置的管理在分布式应用环境中很常见,例如同一个应用系统需要多台服务器配合运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的服务器,这样非常麻烦而且容易出错。
- 解决:ZK就能很好的帮我们完成这项配置工作,可以把配置存储在ZK的一个节点上,设置监听,当我们修改了ZK中这个节点的数据的时候,监听器就能侦测到这个事件,进而把配置应用到各自的服务器中。
- 集群管理(Group Membership)
- 问题:在做集群的时候,一般我们都使用Master、Slave模式,但是如果Master挂了呢?这个时候就会出现问题,整个服务就挂掉了,HA化为泡影;
- 解决:这个问题的解决方式有很多,很多的框架都提供了集群管理的功能,当然我们今天的主角ZK也不例外。首先我们应该注册监听一个节点的子节点,客户端在服务器上注册临时(EPHEMERAL)的节点,当出现如上的问题的时候,就触发ZK的事件,我们就知道哪个节点挂了,根据一定的算法选举出新的Leader,让集群接着向外界提供服务;
- 共享锁(Locks)
- 共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。
- 队列管理
- ZK可以处理两种类型的队列:同步队列和FIFO队列(完整代码参看这里);
- 同步队列实现:当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达;
- 实现思路:创建一个父目录
/synchronizing
,每个成员都监控标志(Set Watch)位目录/synchronizing/start
是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建/synchronizing/member_i
的临时目录节点,然后每个成员获取/ synchronizing
目录的所有目录节点,也就是member_i
。判断i
的值是否已经是成员的个数,如果小于成员个数等待/synchronizing/start
的出现,如果已经相等就创建/synchronizing/start
;
- 实现思路:创建一个父目录
- FIFO队列实现:队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型;
- 实现思路:实现的思路也非常简单,就是在特定的目录下创建
SEQUENTIAL
类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过getChildren()
方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO;
- 实现思路:实现的思路也非常简单,就是在特定的目录下创建
十三、集群节点上下线感知系统实现
-
客户端实现
import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.util.ArrayList; import java.util.List; /** * @Author: ShrekerNil * @Date: 2018/7/5 14:21 * @Description: DistributedClient用来描述ZK在客户端的用法 */ @Slf4j public class DistributedClient { private static final String connectString = "192.168.70.131:2181,192.168.70.132:2181,192.168.70.133:2181"; private static final int sessionTimeout = 2000; private static final String baseNode = "/servers"; private ZooKeeper zkClient = null; private volatile List<String> servers; /** * 创建到zk的客户端连接 */ private void connectZoooKepper() throws Exception { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent event) { try { updateServersAndListenChildrenNodes(); } catch (Exception e) { log.error("更新服务器列表失败", e); } } }); } /** * 更新服务器列表,并且注册子节点监听 */ private void updateServersAndListenChildrenNodes() throws Exception { // 获取服务器子节点信息,并且对父节点进行监听 List<String> nodes = zkClient.getChildren(baseNode, true); // 先创建一个局部的list来存服务器信息 List<String> temp = new ArrayList<String>(); for (String node : nodes) { // child只是子节点的节点名 byte[] data = zkClient.getData(baseNode + "/" + node, false, null); temp.add(new String(data)); } servers = temp; //打印服务器列表 log.debug("servers:{}", servers.toString()); } /** * 业务功能 */ private void handleBussiness() throws InterruptedException { log.debug("Client start working....."); log.debug("Using servers:{}", servers); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 创建客户端对象 DistributedClient client = new DistributedClient(); // 连接ZK client.connectZoooKepper(); // 更新servers的子节点信息(并监听),从中获取服务器信息列表 client.updateServersAndListenChildrenNodes(); // 业务线程启动 client.handleBussiness(); } }
-
集群端实现:
import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; /** * @Author: ShrekerNil * @Date: 2018/7/5 14:19 * @Description: DistributedServer用来描述ZK在服务器端的用法 */ @Slf4j public class DistributedServer { private static final String connectString = "192.168.70.131:2181,192.168.70.132:2181,192.168.70.133:2181"; private static final int sessionTimeout = 2000; private static final String parentNode = "/servers"; private ZooKeeper zkClient = null; /** * 创建到zk的客户端连接 */ private void connectZoooKepper() throws Exception { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent event) { // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) log.debug(event.getType() + "---" + event.getPath()); try { zkClient.getChildren("/", true); } catch (Exception e) { log.error("添加监听器失败", e); } } }); } /** * 向zk集群注册服务器信息 */ private void registerServer(String hostname) throws Exception { String newNode = zkClient.create(parentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); log.debug("Created new node {} in {}", newNode, hostname); } /** * 业务功能 */ private void handleBussiness(String hostname) throws InterruptedException { log.debug(hostname + " start working....."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 创建Server DistributedServer server = new DistributedServer(); // 连接ZK server.connectZoooKepper(); int length = args.length; if (length < 1) { log.error("请添加参数hostname"); return; } // 利用zk连接注册服务器信息 server.registerServer(args[0]); // 启动业务功能 server.handleBussiness(args[0]); } }
十四、写在最后
- 在本篇文章中提到了很多知识点,但是很多知识点只是提了一下,由于篇幅有限,只能写到这里了,如果需要再补充吧
- 补充一个原理讲解比较深刻的博客,大家可以去看看;
- 文章中也引用了一些其他博主的内容,已标记出处,如果转载请标明这些博主的出处,我的就无所谓了,谢谢