Java技术升华面试精选

学了这篇 Zookeeper,分分钟钟写出分布式应用程序

2021-12-24  本文已影响0人  you的日常

前言

本次专题我们要探讨的内容是使用 zookeeper 实现自己的分布式应用程序,相信大家也都了解过 zookeeper,比如我们使用的分布式框架——Dubbo,就是用 zookeeper 实现的注册中心,再比如 Hadoop 用它来实现集群中的高可用,当然,包括耳熟能详的 Kafka 用它来实现生产者的负载均衡等等,而在我们现有的技术体系中,可能对 zookeeper 的了解估计就这么多了,因为我们只是习惯了开源框架的集成,只是知道它可以实现这些功能,而屏蔽了想继续了解它的好奇心,假如我们把 zookeeper 的一些特性加入到我们现有的系统中,用它也能解决了分布式系统中的一些问题,那该多酷啊?好,接下来,我们就一步一步的来探讨。

Zookeeper 简介

我们先看官网上对它的介绍:

A Distributed Coordination Service for Distributed Applications.

ZooKeeper is a distributed, open-source coordination service for distributed applications. It exposes a simple set of primitives that distributed applications can build upon to implement higher level services for synchronization, configuration maintenance, and groups and naming. It is designed to be easy to program to, and uses a data model styled after the familiar directory tree structure of file systems. It runs in Java and has bindings for both Java and C.

Coordination services are notoriously hard to get right. They are especially prone to errors such as race conditions and deadlock. The motivation behind ZooKeeper is to relieve distributed applications the responsibility of implementing coordination services from scratch.

译文:

ZooKeeper 是一个为分布式应用提供的分布式、开源的协调服务。它公开了一组简单的原语,分布式应用程序可以根据这些原语来实现用于同步、配置维护以及组和命名的高级服务。它被设计为易于编程,并使用了文件系统中常见的目录树结构样式的数据模型。它在 Java 中运行,并且有针对 Java 和 C 的绑定。 协调服务是出了名的难搞。它们特别容易出现竞态条件和死锁等错误。

相信大家看了官网上的介绍,也有了对它的一个简单认识,我们摘出来比较重要的词句,比如同步、配置维护,命名,这些词是什么意思?意思是说在分布式系统中,我们可以用它实现同步的功能,也可以实现配置信息的维护,和命名服务,然后就是它的数据呈现格式是目录树,就像一个个的树状结构一样,当然我们先对它有一个大致了解,一下部分我们会详细探讨它的核心知识点与代码实例。

Zookeeper 的核心理念

要理解 zookeeper 的核心,记住三个词就可以了,一致、有头、数据树,什么意思?我们逐个做分析。

解决分布式系统数据一致性问题

zookeeper 集群中,所有的机器数据都会保持一致,从客户端来看,这些集群所有的数据都是一致的,从其中一台删除或修改一条数据,其他剩余机器的数据也会被删除或修改。

有头

只要是集群的机器有一半以上的能运行,永远都会有一个 leader,所有客户端的连接和数据操作都会先连接到 leader 那里(写和更新数据),然后 leader 再去转发到其他的 follower 机器。

数据树

zookeeper 存储的数据结构类型,就像一颗树,每一个节点都会绑定一个数据(data),所以叫做数据树。

角色分析

在一个完整的 zookeeper 集中,永远都会有 n 个 follower,也就是跟随者,永远都会有一个 leader,也就是领导者。

集群的搭建步骤与核心理念的验证

我们的安装环境是 linux,因为 zookeeper 的数量最好是奇数倍,所以我们要准备 3 台 linux 环境的虚拟机,分别部署 zookeeper。

1. 下载地址:

http://www.apache.org/dist/zookeeper/stable/

2. 下载完毕后在其中一台虚拟机中通过 tar -xvf 解压到 /usr/local 目录中。

3. 然后把解压后的 zookeeper 文件价分别 copy 到其他两台机器的相同目录中。

4. 进入 /usr/local/apache-zookeeper-3.5.5-bin/conf 通过 cp zoo_sample.cfg zoo.cfg 命令复制 zoo_sample.cfg 并修改文件名称为 zoo.cfg 此文件就是为 zookeeper 的配置文件。

5. 通过 vim zoo.cfg 命令编辑此文件,会看到如下内容,我们简单分析一下:

集群配置:

     server.1=192.168.56.102:2888:3888
     server.2=192.168.56.103:2888:3888
     server.3=192.168.56.104:2888:3888

代表的意思:

192.168.56.101:2888:3888 的含义是:

  1. :wq 保存退出,到这一步 zookeeper 配置信息已经配置完毕,然后把 zoo.cfg 分别 copy 到其他的两台机器中。

7. 然后更改各个虚拟机的 myid:目录对应 zoo.cfg 中的 dataDir 目录,然后在 3 台虚拟机的 /usr/local/zookeeperData 目录下面分别创建 myid 文件,然后在 3 台机器中 分别执行 vim myid 命令,然后在第 1 台(192.168.56.102)虚拟机中设置为 1,第 2 台(192.168.56.103)设置为 2,第 3 台(192.168.56.104)设置为 3,然后存盘退出。

8. 分别进入 3 台虚拟机的 /usr/local/apache-zookeeper-3.5.5-bin/bin 目录,然后执行 ./zkServer.sh start 命令进行启动 zookeeper 服务,然后执行 ./zkServer.sh status 查看 zookeeper 启动的状态,如果信息如下则启动成功:

在这里插入图片描述

仔细观察 mode 的值,第一台、第二台服务器的 mode 值分别为 follower 代表是跟随者机器,第三台 mode 的值为 leader,代表是领导者机器,正应了我们上面提到的,zookeeper 集群中,只有一台是 leader 服务器,其他全部为 follower 服务器。

操作验证

我们验证一下它的数据一致性问题:

进入到其中一台 zookeeper 服务的 bin 目录,执行 ./zkCl.sh 命令进入到客户端操作,然后执行 create /myZkCom 'myZkCom' 创建一个 myZkCom 节点,值也为 myZkCom,然后 在其他两台 zookeeper 机器中进入客户端,分别执行 get /myZkCom 执行结果如下:

在这里插入图片描述

三个 zookeeper 服务器都有 myZkCom 节点,说明是在 zookeeper 集群中,所有的数据都是保持一致的,接下来我们来探讨它的节点类型和常用命令。

节点类型与常用命令

zookeeper 的节点类型分为持久化节点、持久化顺序节点、 临时节点和临时顺序节点这四大类,我们分别来看一下它们的含义。

持久化节点

节点创建后会被持久化,客户端与 zookeeper 断开连接后,该节点依旧存在,只有主动调用 delete 方法的时候才可以删除它,创建该类型节点的命令为 create /myNode " ",一旦创建完,就持久化到 zookeeper 磁盘上上面了,哪怕是 zookeeper 服务重启也会存在。

持久化顺序节点

除了该节点是持久化之外,节点名称还进行了顺序编号,它的执行命令为 create -s /jin 创建节点结果是 /jin0000000062,如果再次执行 create -s /jin 那么节点名称为 /jin0000000063。

临时节点

节点创建后在创建者超时连接或失去连接的时候,节点会被删除,操作命令为create -e /hao

临时顺序节点

除了该节点是临时节点之外,它的节点名称也进行了顺序编号,节点创建的命令为 create -s -e /hao,四个命令只是 zookeeper 的一部分,我们继续来看一下其他常用的命令。

常用命令操作

我们接下来探讨它常用的一些命令,建议小伙伴们 进入客户端操作一下这些命令,废话不多说,我们直接进入正题。

查看所有目录:

ls /  

查看 service10 的下面节点:

ls /service10 

查看 service11 节点的数据:

get /service11 

创建 merryyou 节点,节点的内容为 merryyou:

create /merryyou merryyou 

获得 merryyou 节点内容:

get /merryyou 

创建临时节点 ( 断开重连之后,临时节点自动消失):

create -e /merryyou/temp merryyou 

根据版本号更新 dataVersion 乐观锁,再次执行 set /merryyou test-merryyou 1 命令时,会出现 version No is not valid : /merryyou

set /merryyou test-merryyou 1 set 

删除节点:

delete /merryyou/sec000000000 

ACL 权限控制

zookeeper 的节点有 5 种操作权限:CREATE、READ、WRITE、DELETE、ADMIN 也就是 增、删、改、查、管理权限,这 5 种权限简写为 crwda(即:每个单词的首字符缩写)。

注:这 5 种权限中,delete 是指对子节点的删除权限,其它 4 种权限指对自身节点的操作权限。

身份的认证有 4 种方式:

getAcl:获取某个节点的 acl 权限信息
    #获取节点权限信息默认为 world:cdrwa 任何人都可以访问
    getAcl /merryyou

    #设置节点权限 crwa 不允许删除
    setAcl /merryyou world:anyone:crwa

    #设置节点的权限信息为 rda
    setAcl /merryyou world:anyone:rda

这些命令在网上也能找的到,当然大家也不用刻意去背,忘记了知道在哪查就可以了。

读写流程

我们列举了很多命令操作,它的读写流程什么样的呢,比如执行了 create /myTree "" 创建 myTree 节点的命令,在 zookeeper 集群中是怎么操作的呢?我先画一个流程图,然后再进行详细分析。

流程图讲解

在这里插入图片描述

我们简单分析一下创建节点的流程图:

1. client 向 Zookeeper 的 server1 发送一个写请求,客户端写数据到 server1 上。

2. 如果 server1 不是 Leader,那么 server1 会把接收到的写请求转发给 Leader;然后 Leader 会将写请求转发给每一个 server。

3. server1 和 server2 负责写数据,并且两个 follower 的写入数据是一致的,保存相同的数据副本。

温馨提示:这里是 3 台服务器,只要 2 台 Follower 服务器写成功就 ok。因为 client 访问的是 server1,所以 leader 会告知 server1 集群中数据写成功。

4. 被访问的 server1 进一步通知 client 数据写成功,这时,客户端就知道整个写操作成功了。

5. 如果是读操作,就很简单,访问哪一个 server,哪一个 server 直接就会给客户端返回结果。

ZAB 协议

ZAB 协议是为 ZooKeeper 专门设计的一种支持崩溃恢复的一致性协议。基于该协议,ZooKeeper 实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性,ZAB 协议分为两种模式:

leader 节点挂掉之后会发生什么

我们直接还是做实验,实验如下步骤:

  1. 我们通过./zkServer.sh stop 关闭 leader 节点。

  2. 然后在其他的两个节点分别执行 ./zkServer.sh status 命令 查看其状态信息,操作结果如下:

    在这里插入图片描述

请看,leader 服务关闭之后,第二台 zookeeper 自动升级为了 leader,第一台 zookeeper 则是为 follower,有小伙伴可能有疑问了,为什么不是第一台是 leader 而是第二台成为了 leader?大家可以把这个问题先放一放,到下面我们再继续探讨,我们接下来学习用 java 程序怎么使用 zookeeper。

第一个 Zookeeper 程序

好,废话不多说,我们直接上代码:

首先我们引入 zookeeper 的 maven 依赖:

 <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.5</version>
        </dependency>

HelloZk.java

package com.zk.run;

import com.common.ZkConnect;

public class HelloZk {

    public static void main(String[] args) throws Exception {
        ZkConnect connection =new ZkConnect();
        connection.connect();//连接 zookeeper 服务
        connection.createPersistentNode("/HelloMyZKxxo","888");//创建一个路径名 HelloMyZKxxo 的持久化节点
        String zkData = connection.getData("/HelloMyZKxxo");//获取 HelloMyZKxxo 节点数据
        System.out.println(zkData);
        connection.deleteNode("/HelloMyZKxxo");//删除 HelloMyZKxxo
    }
}

我们创建了一个 HelloZk 的运行主类,实例化一个 ZkConnect 对象,该对象有一个连接 zookeeper 的 connect 方法,然后通过调用 connection.createPersistentNode()方法,创建一个持久化节点,然后通过调用 connection.getData()方法获取该节点的数据,最后删除该节点,这是一个非常简单的创建节点、获取节点内容和删除节点的操作,接下来我们重点来看一下 ZkConnect 类:

package com.common;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

/**
 * @Classname ZkConnect
 * @Description TODO
 * @Date 2019/8/26 11:32
 * @Created by youDaily
 */
public class ZkConnect implements Watcher {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static final String ADDRESS = "192.168.56.102:2181,192.168.56.103:2181,192.168.56.104:2181";

    private ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent event) {
        System.out.println("receive the event:" + event);
        if (Event.KeeperState.SyncConnected == event.getState()) {
            countDownLatch.countDown();
        }
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void connect(){
        try {
            zooKeeper = new ZooKeeper(ADDRESS, 5000, this);
            countDownLatch.await();
            System.out.println("已连接!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建持久化节点
     * @param path
     * @param data
     * @return
     * @throws Exception
     */
    public String createPersistentNode(String path,String data) throws Exception{
        return this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     *创建临时节点
     * @param path
     * @param data
     * @return
     * @throws Exception
     */
    public String createEphemeralNode(String path,String data) throws Exception{
        return this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }

    /**
     * 获取节点数据
     * @param path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String getData(String path) throws KeeperException, InterruptedException {
        byte [] b =  this.zooKeeper.getData(path, new Watcher(){

            @Override
            public void process(WatchedEvent event) {

                if(event.getType() == Event.EventType.NodeDeleted){
                    System.out.println("节点路径:"+event.getPath()+"已被删除……");
                }
            }
        },null);
        return new String(b);
    }

    /**
     * 删除节点
     * @param path
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void deleteNode(String path) throws KeeperException, InterruptedException {
        this.zooKeeper.delete(path,-1);
    }
}

此类是操作 zookeeper 的核心类,我们具体来分析一下这些代码的含义:

首先该类实现了 Watch 接口,此接口是 zookeeper指定事件处理程序类必须实现的公共接口,也就是说当客户端连接 zookeeper 服务、增删改查节点时,通过向客户端注册 Watcher, 然后以回调的方式触发 Watcher 实现类的 process 方法。

我们实例化了一个 countDownLatch 门闩属性,因为连接 zookeeper 是一个异步操作,所以用它来堵塞客户端对 zookeeper 的连接操作,当连接成功后调用 countDownLatch.countDown()释放当前线程的堵塞。

然后我们声明了连接 zookeeper 集群的 ip 地址,因为是集群所有是三个 ip 地址,并通过逗号隔开。声明一个 zooKeeper 属性,用它来创建、删除节点等操作。

接下来我们看 connect() 方法,此方法的作用是连接 zookeeper 服务的,通过实例化 zookeeper 对象,然后把 ip 地址传进去,再把 watch,也就是当前的对象传进去,意思是说连接 zookeeper 时,注册了 watch,一旦连接成功或者失败会触发 watch 接口实现类的 process 方法,因为 ZkConnect 本身就是一个 Wacher,所以直接传 this,上面也提到过,连接 zookeeper 服务本身是一个异步操作,所以我们采用 countDownLatch 进行堵塞,当连接成功时会触发 process 方法,当此方法的参数 WatchedEvent 为 Event.KeeperState.SyncConnected 已连接时,则执行 countDownLatch.countDown(); 取消堵塞。

我们接下来看 createPersistentNode 方法,此方法是用来创建持久化节点的,我们重点看创建节点的方法:

zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

此方法第一个参数是节点的路径名称,第二个参数是节点数据,第三个参数是创建节点的权限,ZooDefs.Ids.OPEN_ACL_UNSAFE 代表如何连接的客户端都可以操作该节点,最后一个参数是节点的类型,我们通过 CreateMode.PERSISTENT 属性设置为持久化节点,当然也可以设置其他的三种数据类型。

然后我们来看 createEphemeralNode 方法,此方法和 createPersistentNode 类似,唯一的区别是此节点是临时节点,客户端一旦失去连接后,节点则自动删除。

getData 方法的作用为获取节点数据,此方法只有一个 path 参数,也就是说通过路径就可以获取数据,并且在此方法上面注册了 watcher,当获取该节点的数据发生变化时,则会触发 watch 实现类的 process 回调函数,我们为了简单测试,只是判断了当删除该节点的时候打印一句话。

deleteNode 方法的作用顾名思义是用来删除节点的,第二个参数是版本号,当为-1 时则不使用版本号删除。

测试

我们运行 HelloZk 类,看看是否可以达到所预期的结果:

在这里插入图片描述

ok,没有问题,当节点删除的时候,也进入到了 watch 类中的回调函数,相信大家都已经掌握了这个小例子,接下来我们将进入项目实战,请大家该倒水的倒水,该上厕所的上厕所,因为重点要来了!

Zookeeper 实战——编写自己的分布式应用程序

我们将探讨三个实例做为 zookeeper 的进阶学习,相信大家理解了这三个例子后更能体会到 zookeeper 的强大,并能在真实的项目中,实现自己的分布式应用程序,好,废话不多说,我们一起来开始探讨。

实现注册中心

注册中心在分布式通信系统的作用极其的重要,比如 Dubbo、SpringCloud 都有自己的注册中心,在没有注册中心的时候,consumer 获取 provider 的服务信息(ip 地址和端口号)时,要不就是写死在 consumer 端,要不就是保存在数据库中,这样做会有三个弊端:

发布与订阅服务流程图
在这里插入图片描述

上面的流程图是注册中心的第一大步,provider 发布服务和 consumer 订阅服务,总共是四个步骤,我们来分析一下他们发布与订阅过程:

  1. 首先 provider1 和 provider2 会连接 zookeeper 服务器集群。
  2. 第二步,它们分别把自己的 ip 地址和端口号在/myServer/1.0.1 节点下创建子节点,分别是/myServer/1.0.1/127.0.0.1:8080 和/myServer/1.0.1/127.0.0.1:8081。
  3. 第三步,consumer 开始连接 zookeeper 集群。
  4. 第四步,consumer 获取 /myServer/1.0.1 节点下的所有子节点。
  5. 最后一步,返回给所有的服务器列表给 consumer 端,并缓存到 consumer 本地。

这就是发布与订阅的过程,有小伙伴可能存在疑问,节点 /myServer/1.0.1 是什么意思?大概解释一下,首先 myServer 是 provider 发布服务的实例名称,比如订单服务提供方与支付服务提供方的实例名是不一样的,不同的实例名会区分不同的服务提供者,1.0.1 是服务提供方的版本号,有版本号的好处是可以令 consumer 端有更多的服务选择,比如 provider 端有服务更新,需要 consumer 端修改一些代码,但是 consumer 端还不能立刻完成,那么有了版本号就可以让 consumer 调用老服务,如果代码更新了,再根据版最新的版本号调用新服务即可。

服务提供方增加服务器的流程图

随着用户的增多,并发量也一直在增加,provider 端避免不了增加服务器以此来分担来自 consumer 访问的压力,我还是沿着发布与订阅的结果之上用流程图进行分析。

在这里插入图片描述
负载均衡流程图

从注册中心获取 provider 的服务信息一般是多条,也就是说 provider 是一个集群,那么就可以通过负载均衡保持服务提供方的稳定性,我们还是以一个流程图分析:

在这里插入图片描述

我们简单来分析一下 consumer 与 provider 端之间是怎么实现负载均衡的:

关于注册中心的流程图已经分析完了,但是我们还是要结合应用程序再更深一层的探讨。

代码实现服务注册与发现

终于到了我们的代码环节了,废话不多说,我们直接上注册中心的代码:

Provider1.java

package com.provider;
import com.registCenterCommon.RegistCenterProvider;

import    java.net.Socket;

import java.net.ServerSocket;

/**
 * 服务提供方
 */
public class Provider1 {

    public static boolean isRunning = true;
    /**
     * 服务名称
     */
    private static String serviceName = "myServer";
    /**
     * 端口号
     */
    private static int port = 8080;
    /**
     * ip
     */
    private static String ip = "127.0.0.1";
    /**
     * 版本
     */
    private static String version = "1.0.1";

    public static void main(String[] args) throws Exception {

        ServerSocket serverSocket = new ServerSocket(port);
        //把当前的服务信息注册到注册中心中
        RegistCenterProvider registCenterProvider = new RegistCenterProvider(ip,version,port,serviceName);
        registCenterProvider.register();

        while(isRunning){
            Socket socket = serverSocket.accept();
            System.out.println("当前连接的服务版本、ip 和端口号为:/"+version+"/"+ip+":"+port);
        }
        serverSocket.close();
    }

}

此类是用来模拟服务提供方,专门为 consumer 端提供服务,当然还有 Provider2、Provider3,因为除了端口号不一样之外,其他的代码都一样,所以 Provider2 和 Provider3 的代码就不一一的贴了,这三个类一起启动,用来模拟服务提供方的一个集群,然后我们分析一下 Provider1 的代码:

RegistCenterProvider.java

package com.registCenterCommon;

import com.common.ZkConnect;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 服务提供端注册中心
 */
public class RegistCenterProvider {

    /**
     * ip 地址
     */
    private String  ip;
    /**
     * 端口号
     */
    private String version;
    /**
     * 端口号
     */
    private int port;
    /**
     * 服务名称
     */
    private String serviceName;

    public RegistCenterProvider(String ip, String version, int port, String serviceName) {
        this.ip = ip;
        this.version = version;
        this.port = port;
        this.serviceName = serviceName;
    }

    /**
     * 服务提供方注册
     * @throws Exception
     */
    public  void register() throws Exception {
        ZkConnect zk = new ZkConnect();
        zk.connect();
        ZooKeeper zooKeeper = zk.getZooKeeper();
        if(zooKeeper.exists("/"+serviceName,false)==null){
            zk.createPersistentNode("/"+serviceName,"");
        }
        Stat stat =  zooKeeper.exists("/"+serviceName+"/"+version,false);
        if(stat == null){
            zk.createPersistentNode("/"+serviceName+"/"+version,"");
        }
        zk.createEphemeralNode("/"+serviceName+"/"+version+"/"+ip+":"+port,"");
        System.out.println("服务提供方注册成功,注册信息为:/"+serviceName+"/"+version+"/"+ip+":"+port);
    }
}

RegistCenterProvider 类实现了服务提供者的注册中心功能,我们来具体分析一下:

Consumer.java

package com.consumer;
import    java.net.Socket;

import com.registCenterCommon.Connect;
import com.registCenterCommon.RegistCenterConsumer;

import java.util.List;
import java.util.Random;

/**
 * @Classname Consumer
 */
public class Consumer {

    private static String serviceName = "myServer";

    private static String version = "1.0.1";

    public static void main(String[] args) throws Exception {

        RegistCenterConsumer registCenterConsumer = new RegistCenterConsumer(serviceName,version);
        List<Connect> services = registCenterConsumer.pullServiceList();

        for(int i = 0;i<20;i++){
            int randomIndex = new Random().nextInt(services.size());
            Connect connect = services.get(randomIndex);
            Socket socket = new Socket(connect.getIp(),connect.getPort());
            System.out.println(connect+"连接成功!!!");
        }

        Thread.sleep(40000);

        System.out.println("重新访问…………………………");

        System.out.println("最新列表信息为:"+services);

        for(int i = 0;i<20;i++){
            int randomIndex = new Random().nextInt(services.size());
            Connect connect = services.get(randomIndex);
            Socket socket = new Socket(connect.getIp(),connect.getPort());
            System.out.println(connect+"连接成功!!!");
        }
    }
}

我们来具体分析一下 Consumer 端的代码:

RegistCenterConsumer.java

package com.registCenterCommon;

import com.common.ZkConnect;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class RegistCenterConsumer {

    /**
     * 端口号
     */
    private String version;
    /**
     * 服务名称
     */
    private String serviceName;
    /**
     * zk 对象
     */
    private ZooKeeper zooKeeper;

    /**
     * 服务列表容器
     */
    private  final List<Connect> serviceList = new CopyOnWriteArrayList();

    public RegistCenterConsumer(String serviceName,String version){
        this.serviceName = serviceName;
        this.version = version;
    }

    /**
     * 服务提供方注册
     * @throws Exception
     */
    public  List<Connect> pullServiceList() throws Exception {
        ZkConnect zk = new ZkConnect();
        zk.connect();
        zooKeeper = zk.getZooKeeper();
        List<String> serverList = this.getServerList("/"+serviceName+"/"+version);
        serviceList.addAll(this.getConnectByString(serverList));
        return serviceList;
    }

    /**
     * 根据服务列表获取连接对象列表
     * @param list
     * @return
     */
    private  List<Connect> getConnectByString(List<String> list){
        List<Connect> connectList = new ArrayList<>();
        for(String str : list){
            String ip = str.substring(0,str.indexOf(":"));
            String port = str.substring(str.indexOf(":")+1,str.length());
            connectList.add(new Connect(ip,Integer.parseInt(port)));
        }
        return connectList;
    }

    /**
     * 功能描述: <br>
     * 〈获取集群的服务列表〉
     * @Param: [path]
     * @Return: java.util.List<java.lang.String>
     */
    private List<String> getServerList(String path) {
        try {
            return zooKeeper.getChildren(path, new serverListWatch());
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    class serverListWatch implements Watcher {

        @Override
        public void process(WatchedEvent watchedEvent) {
            if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
                System.out.println("服务列表节点数据产生变化~~~~~~");
                serviceList.clear();
                serviceList.addAll(getConnectByString(getServerList(watchedEvent.getPath())));
                System.out.println("最新服务器列表:"+serviceList);
            }
        }

    }

}

RegistCenterConsumer 类的作用是实现服务调用方注册中心的,废话不多说,咱们直接进入分析阶段:

测试

测试流程如下:

  1. 保证 zookeeper 集群可用。
  2. 先启动 Provider1、Provider2、Provider3。
  3. 然后启动 Consumer。

运行结果如下:

Provider1

在这里插入图片描述

Provider2

在这里插入图片描述

Provider3

在这里插入图片描述

Consumer

在这里插入图片描述

大家请看,Consumer 端启动之后可以随机的访问 Provider 三个服务,从而达到了负载均衡的效果,当把 Provider1 服务停掉之后,会发生什么呢?我们也做一下实验,结果如下:

在这里插入图片描述

我们把 Provider1 服务停掉之后,Consumer 端就会有了一个获取最新服务列表的回调,然后下一次再进行访问 Provider 服务集群的时候,Provider1 将不会被访问的到,这就是 zookeeper 实现注册中心的整个内容,当然此功能还需要大量的优化和完善的点,由于篇幅有限只能和大家一起分享注册中心的核心内容,感兴趣的小伙伴可以进一步优化。

接下来我们来探讨另一个实战案例——用 zookeeper 实现 HA 架构。

实现 HA(高可用),让你的服务器永不宕机

zookeeper 还有一个特别重要的场景,就是对分布式系统实现 HA,比如 Hadoop 就是采用 zookeeper 来实现高可用的,而在我们的分布式应用程序中,该怎么实现 HA 呢?接下来我们就一步一步的解密它吧。

上一篇 下一篇

猜你喜欢

热点阅读