zookeeper

zookeeper使用及原理浅析

2018-11-07  本文已影响0人  拉风小野驴

目录

1.zookeeper是什么
2.zookeeper使用
2.1.api介绍
2.2.使用案例
2.2.1.概述
2.2.2.准备工作
2.2.3.配置中心
2.2.4.分布式锁
2.2.5.小结
3.集群搭建
3.1.准备工作
3.2.配置文件详解
3.3.启动
4.原理
4.1.独立模式
4.1.1.启动步骤
4.1.2.请求处理过程
4.1.3.请求处理器详解
4.2.核心数据结构
4.2.1.DataTree
4.2.2.ZKDatabase(简称zkdb)
4.3.持久化及重建机制
4.3.1.概述
4.3.2.SnapLog
4.3.3.TxnLog
4.3.4.FileSnapTxnLog
4.4.集群模式运行过程
4.4.1.概述
4.4.2.选举(FastLeaderElection)
4.4.3.集群初始化
4.4.3.1.内部通信数据结构
4.4.3.2.初始化过程
4.4.4.对外服务(请求处理器)
4.4.4.1.Server体系概述
4.4.4.2.LeaderZooKeeperServer
4.4.4.3.FollowerZooKeeperServer:
5.总结

1.zookeeper是什么

官方解释:zookeeper(以下简称zk)是一个高可用的分布式协调服务

2.zookeeper使用

2.1.api介绍

zk为客户端提供了对DataTree的8种操作,如下:
1.创建ZNode节点
2.删除ZNode节点
3.变更ZNode节点数据
4.变更ZNode节点权限
5.查询节点是否存在(可同时配置监视器)
6.查询节点数据(可同时配置监视器)
7.查询节点权限
8.查询子节点名称(可同时配置监视器)
以上这8种操作,zk均提供了同步调用和异步调用两种调用方式,但是实际上,客户端与服务端都是采用异步的方式进行通信,客户端内部通过线程通信(wait && notify)实现异步转同步的操作.

同步api如下:

public interface ClientAPI extends Closeable {

    String create(String path, byte data[], List<ACL> acls, CreateMode createMode) throws KeeperException, InterruptedException;

    void delete(String path, int version) throws InterruptedException, KeeperException;

    Status setData(String path, byte data[], int version) throws KeeperException, InterruptedException;

    Status setACL(String path, List<ACL> acls, int version) throws KeeperException, InterruptedException;

    Status exists(String path, boolean watch) throws KeeperException, InterruptedException;

    Status exists(String path, Watcher watcher) throws KeeperException, InterruptedException;

    byte[] getData(String path, boolean watch, Status status) throws KeeperException, InterruptedException;

    byte[] getData(String path, Watcher watcher, Status status) throws KeeperException, InterruptedException;

    List<ACL> getACL(String path, Status status) throws KeeperException, InterruptedException;

    List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException;

    List<String> getChildren(String path, boolean watch, Status status) throws KeeperException, InterruptedException;

    List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException;

    List<String> getChildren(final String path, Watcher watcher, Status status) throws KeeperException, InterruptedException;
}

异步api如下:

public interface AsyncClientAPI extends Closeable {

    void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback callback, Object context);

    void delete(final String path, int version, VoidCallback callback, Object context);

    void setData(final String path, byte data[], int version, StatCallback callback, Object context);

    void setACL(final String path, List<ACL> acl, int version, StatCallback callback, Object context);

    void exists(String path, boolean watch, StatCallback callback, Object context);

    void exists(final String path, Watcher watcher, StatCallback callback, Object context);

    void getData(final String path, boolean watch, DataCallback callback, Object context);

    void getData(final String path, Watcher watcher, DataCallback callback, Object context);

    void getACL(final String path, Status stat, ACLCallback callback, Object context);

    void getChildren(final String path, boolean watch, ChildrenCallback callback, Object context);

    void getChildren(final String path, boolean watch, Children2Callback callback, Object context);

    void getChildren(final String path, Watcher watcher, ChildrenCallback callback, Object context);

    void getChildren(final String path, Watcher watcher, Children2Callback callback, Object context);
}

除了上述操作之外,zk还提供了事务操作api,可以把多个写操作合成为一个原子操作事务(要么全部成功要么全部失败),api如下:

public interface TransactionAPI {

    List<OpResult> multi(Iterable<Op> ops) throws InterruptedException, KeeperException;

    Transaction transaction();
}

multi()方法用于一次向zk服务器发送多个操作.
当然,我们也可以调用transaction()方法,以一种链式调用方式构建原子操作.

public class Transaction {
    private ZooKeeper zk;
    private List<Op> ops = new ArrayList<>();

    Transaction(ZooKeeper zk) {
        this.zk = zk;
    }

    public Transaction create(final String path, byte data[], List<ACL> acl, CreateMode createMode) {
        ops.add(Op.create(path, data, acl, createMode.toFlag()));
        return this;
    }

    public Transaction delete(final String path, int version) {
        ops.add(Op.delete(path, version));
        return this;
    }

    public Transaction check(String path, int version) {
        ops.add(Op.check(path, version));
        return this;
    }

    public Transaction setData(final String path, byte data[], int version) {
        ops.add(Op.setData(path, data, version));
        return this;
    }

    public List<OpResult> commit() throws InterruptedException, KeeperException {
        return zk.multi(ops);
    }
}

2.2.使用案例

2.2.1.概述

在回顾一下zk的3大特性:
1.写操作严格有序
2.watch机制
3.临时节点
利用这三大特性,我们可以进一步封装,实现一些业务组件,如:分布式锁,配置中心,主备切换,负载均衡,服务发现,任务调度,等等.
下面通过一些demo来演示如何使用zk.

2.2.2.准备工作

首先需要引入zk的构件,maven坐标如下:

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5</version>
</dependency>

2.2.3.案例一:配置中心

接下来的例子是模拟一个美团mcc的实现,在这个例子中,我们将/mcc/{appkey}定义为我们获取配置数据的路径,ZNode中的数据存储Properties序列化后的数据.
ConfigUtilAdapter初始化时,通过getData拉取服务配置,拉取的同时,向zk集群注册watch,来监听ZNode数据的变化,当接到数据变更事件时,重新进行数据拉取(并再次注册watch),如此循环.
代码实现如下:

public class MccDemo {

    /**
     * 模拟美团的mcc配置工具类
     */
    public static class ConfigUtilAdapter implements Watcher {

        public static final String CONFIG_PATH = "/mcc/${yourAppkey}";

        private ConcurrentHashMap<String, String> config = new ConcurrentHashMap<>();
        private static volatile boolean init = false;
        private ZooKeeper client;

        private static ConfigUtilAdapter instance;

        public static void init() {
            if (init) {
                return;
            }
            instance = new ConfigUtilAdapter();
        }

        public ConfigUtilAdapter() {
            try {
                client = new ZooKeeper("ip:port", 1000, this, false);
            } catch (IOException e) {
                throw new IllegalStateException("初始化失败", e);
            }
            pullData();
        }

        private synchronized void pullData() {
            try {
                byte[] data = client.getData(CONFIG_PATH, this, null);
                Properties props = new Properties();
                props.load(new ByteArrayInputStream(data));
                config.clear();
                for (Map.Entry<Object, Object> entry : props.entrySet()) {
                    config.put(entry.getKey().toString(), entry.getValue().toString());
                }
            } catch (Exception e) {
                throw new IllegalStateException("拉取配置失败", e);
            }


        }

        public static String getString(String key) {
            return instance.config.get(key);
        }

        @Override
        public void process(WatchedEvent event) {
            if ((event.getType() == EventType.NodeDataChanged) && (CONFIG_PATH.equals(event.getPath()))) {
                pullData();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        mccInit();
        ConfigUtilAdapter.init();
        Thread.sleep(2000);
        String value = ConfigUtilAdapter.getString("key");
        System.out.println("第1次获取配置:" + value);
        mccCentralChange();
        Thread.sleep(500);
        value = ConfigUtilAdapter.getString("key");
        System.out.println("第2次获取配置:" + value);
    }

    private static void mccCentralChange() throws Exception {
        try (ZooKeeper client = new ZooKeeper("ip:port", 1000, even -> {
        }, false)) {
            Properties props = new Properties();
            props.put("key", "helloWorld");
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            props.store(out, null);
            client.setData("/mcc/{yourAppkey}", out.toByteArray(), 0);
        }
    }

    /**
     * 此初始化过程相当于服务申请时伴随appkey产生而执行的初始化操作
     *
     * @throws Exception
     */
    public static void mccInit() throws Exception {
        try (ZooKeeper client = new ZooKeeper("172.18.212.149:2189", 1000, even -> {
        }, false)) {
            try {
                Status status = client.exists(CONFIG_PATH, false);
                client.delete(CONFIG_PATH, status.getVersion());
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                Status status = client.exists("/mcc", false);
                client.delete("/mcc", status.getVersion());
            } catch (Exception e) {
                e.printStackTrace();
            }
            client.create("/mcc", new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            client.create("/mcc/${yourAppkey}", new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
}

代码运行结果如下:

第1次获取配置:null
第2次获取配置:helloWorld

2.2.4.案例二:分布式锁

加锁
分布式锁的阻塞式获取,其实就是锁资源的并发竞争,在zk的视角里,就是对同一个path对应的ZNode的竞争创建.
zk的进程间同步的特性保证的同一时间,只有有一个client成功创建节点,其他client都会创建失败并提示NodeExist.
当client收到NodeExist的提示时,说明自己加锁没有成功,则此时需要进行锁等待,只要利用zk的watch机制监控锁对应的ZNode变更,
来唤醒等待线程,进行加锁重试,即可完成整个加锁流程.
释放锁
锁的释放分两种情况:
1.client宕机,server检测不到心跳,则当达到会话超时时间时,server中的SessionTracker会自动删除会话,并同时删除该会话创建的所有临时节点,从而锁节点被释放(利用zk的临时节点特性)
2.client在执行完业务逻辑后,主动进行锁释放,其实就是主动调用delete进行节点删除
粗略的代码实现如下(只为了验证zk的特性,因此很多异常处理的逻辑被省略掉了)

public class LockDemo {

    /**
     * zk实现的分布式锁
     */
    public static class ZooKeeperLock implements Watcher {

        private static final Logger LOG = LoggerFactory.getLogger(com.lixin.lock.ZooKeeperLock.class);

        private static final String LOCK_PATH = "/lock";

        private ZooKeeper client = new ZooKeeper("172.18.212.149:2189", 60000, this, false);
        private Object lock = new Object();

        public ZooKeeperLock() throws IOException {
            while (!client.getClientState().isConnected()) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    LOG.error("", e);
                }
            }
        }

        public void lock() throws Exception {
            boolean success = false;
            while (!success) {
                try {
                    client.create(LOCK_PATH, new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    success = true;
                } catch (KeeperException e) {
                    if (e.code() == Code.NODEEXISTS) {
                        synchronized (lock) {
                            client.exists(LOCK_PATH, this);
                            lock.wait();
                        }
                    }
                }
            }
            LOG.info(Thread.currentThread().getName() + "获取锁成功");
        }


        public boolean tryLock() {
            try {
                client.create(LOCK_PATH, new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                return true;
            } catch (Exception e) {
                return false;
            }
        }


        public void unlock() throws Exception {
            client.delete(LOCK_PATH, 0);
            LOG.info(Thread.currentThread().getName() + "释放锁成功");
        }


        @Override
        public void process(WatchedEvent event) {
            if ((event.getType() == EventType.NodeDeleted) && (LOCK_PATH.equals(event.getPath()))) {
                synchronized (lock) {
                    lock.notifyAll();
                }
            }
        }
    }

    /**
     * 竞争锁的线程,这里模拟加锁->业务操作->释放锁的流程
     */
    public static class FightThread extends Thread {

        public FightThread(String name) {
            super(name);
        }
        @Override
        public void run() {
            ZooKeeperLock lock;
            try {
              lock  = new ZooKeeperLock();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            System.out.println("开始竞争锁");
            while (true) {
                try {
                    lock.lock();
                    Thread.sleep(2000);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }
        }
    }

    public static void main(String[] args) throws Exception{
        FightThread t1 = new FightThread("线程-1");
        t1.setDaemon(true);
        FightThread t2 = new FightThread("线程-2");
        t2.setDaemon(true);
        t1.start();
        t2.start();
        Thread.sleep(60000);
    }
}

该Demo的执行结果如下:

开始竞争锁
开始竞争锁
2018-07-31 14:15:34,666 [myid:] - INFO [线程-2:ZooKeeperLock@53] - 线程-2获取锁成功
2018-07-31 14:15:36,675 [myid:] - INFO [线程-2:ZooKeeperLock@69] - 线程-2释放锁成功
2018-07-31 14:15:36,678 [myid:] - INFO [线程-2:ZooKeeperLock@53] - 线程-2获取锁成功
2018-07-31 14:15:38,687 [myid:] - INFO [线程-2:ZooKeeperLock@69] - 线程-2释放锁成功
2018-07-31 14:15:38,690 [myid:] - INFO [线程-2:ZooKeeperLock@53] - 线程-2获取锁成功
2018-07-31 14:15:40,699 [myid:] - INFO [线程-2:ZooKeeperLock@69] - 线程-2释放锁成功
2018-07-31 14:15:40,701 [myid:] - INFO [线程-1:ZooKeeperLock@53] - 线程-1获取锁成功
2018-07-31 14:15:42,707 [myid:] - INFO [线程-1:ZooKeeperLock@69] - 线程-1释放锁成功
2018-07-31 14:15:42,709 [myid:] - INFO [线程-2:ZooKeeperLock@53] - 线程-2获取锁成功
2018-07-31 14:15:44,717 [myid:] - INFO [线程-2:ZooKeeperLock@69] - 线程-2释放锁成功
2018-07-31 14:15:44,718 [myid:] - INFO [线程-1:ZooKeeperLock@53] - 线程-1获取锁成功
2018-07-31 14:15:46,726 [myid:] - INFO [线程-1:ZooKeeperLock@69] - 线程-1释放锁成功
2018-07-31 14:15:46,729 [myid:] - INFO [线程-2:ZooKeeperLock@53] - 线程-2获取锁成功
2018-07-31 14:15:48,736 [myid:] - INFO [线程-2:ZooKeeperLock@69] - 线程-2释放锁成功
2018-07-31 14:15:48,738 [myid:] - INFO [线程-1:ZooKeeperLock@53] - 线程-1获取锁成功

2.2.5.小结

3.集群搭建

3.1.准备工作

下载zookeeper分发包,本文使用的版本为3.4.5,对应下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/

3.2.配置文件详解

zk服务器启动时,需要读取配置文件对自身服务进行配置,配置文件初始位置在分发包的conf/zoo_sample.cfg.

zk服务器启动时默认读取的路径为conf/zoo.cfg.

配置文件本质是一个properties文件,服务器启动初始,会首先解析配置文件,在内存中创建一个类型为QuorumPeerConfig的对象(文件解析逻辑也在其中).

下面详细解释下文件中各项配置的含义和作用:

# 快照存储目录,无默认值
dataDir=
# 事务存储目录,无默认值
dataLogDir=
# 集群节点对外提供服务的监听端口,默认2181
clientPort=2181
# 集群节点对外提供服务的监听host地址,默认监听0.0.0.0
clientPortAddress=0.0.0.0

# 每个客户端ip的最大连接数限制,默认60
maxClientCnxns=60

# zk服务器的时钟周期定义,它是一个时间单位度量,单位为毫秒
# 默认值为3000,也就是说一个时钟周期为3秒
tickTime=3000

######################################################
# 最小和最大会话超时时间,用于限制客户端连接的超时时间设置
# 如果客户端超时值比最小值小,则被修正为最小值
# 如果客户端超市之比最大值大,则被修正为最大值
######################################################
# 客户端会话的最小超时时间,-1代表使用默认值,默认为2个tickTime
minSessionTimeout=-1
# 客户端会话的最大超时时间,-1代表使用默认值,默认为20个tickTime
maxSessionTimeout=-1
######################################################
# 以下两个配置,用于配置数据目录清理管理器(DatadirCleanupManager)的运行逻辑
######################################################
# 清理数据时,需要保存的快照数量,默认值为3,且配置值至少为3
autopurge.snapRetainCount=3
# 运行清理任务的时间间隔,单位为小时,默认值为1,负数和0代表代表不执行清理
autopurge.purgeInterval=1
#-----------------------------------------------------------------------------------------------------------------------
######################################################
# 以上为独立模式和集群模式都需要配置的属性
# 以下为集群模式的专用配置
######################################################
# 集群初始化阶段的网络读取超时时间,单位为tickTime,无默认值
# 从选举结束到集群对外提供服务的这段时间,称之为初始化阶段
# 初始化需要经过epoch协商,以及为达到一致性状态leader向follower发起的数据传输
initLimit=10
# 集群对外服务阶段,leader向follower的同步数据的网络读取超时时间,单位为tickTime,无默认值
syncLimit=5
# 集群群首选举时使用的算法标识,当前只支持3快速群首选举(FastLeaderElection,简称FLE)
electionAlg=3
# 当前服务器节点的角色,可选值为:participant(参与者) or observer(观察者)
# 默认为参与者
peerType=participant
# 节点配置,格式为:  server.sid=ip:syncPort:electionPort:role
#server.1=127.0.0.1:2183:2183:participant
#server.2=192.168.0.1:2183:2183:participant


######################################################
# 以下为组策略配置,如果进行了组配置,zk会改变仲裁策略
# 默认仲裁策略QuorumMajority为数量过半
# 配置组策略后,仲裁策略变为QuorumHierarchical分层仲裁,即组内服务器过半的组的数量要过半
# 组策略在zk多机房多机架部署下,可以更好提高集群可靠性

# 集群组配置,格式为:  group.gid=sid1:sid2:......
#group.1=1:2
# 节点权重,格式为:  server.sid=weight,默认权重都为1
# 上述所说的数量过半中的[数量],实际是权重之和
#weight.1=1
#weight.2=1


######################################################
# 剩余的其他key/value配置,都会给key追加"zookeeper."前缀
# 用于zk的扩展配置
######################################################

注意:集群服务器节点个数建议配置为奇数,这不是必须的,但是是相对最优的.

举例说明:相比于一个5个服务器的集群(描述为5n)而言,
(1)4个服务器的集群,则更加脆弱(5n允许2个服务器宕机,而4n只能允许1个服务器宕机);
(2)6个服务器的集群,则会使集群法定人数增加(5n法定人数为3,而6n法定人数为4),进而使服务器需要更多的确认操作.

3.2.启动

把配置好的文件,放置到conf/zoo.cfg位置
执行命令

# 启动服务
bin/zkServer.sh start
# 停止服务
bin/zkServer.sh stop

使用client测试服务启动是否成功

ZooKeeper zk = new ZooKeeper("10.4.236.198:2181,10.4.233.228:2181,10.4.244.77:2181", Integer.MAX_VALUE, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            KeeperState state = event.getState();
            if (KeeperState.SyncConnected.equals(state)||KeeperState.ConnectedReadOnly.equals(state)) {
                System.out.println("连接成功");
            }
        }
}, false);

4.原理

4.1.独立模式

4.1.1.启动步骤

zk服务器启动分为以下步骤:
1.启动服务端口监听(ServerCnxnFactory)
2.启动zkDatabase(创建内存实例,从文件加载数据)
3.启动会话跟踪器(SessionTracker)
4.安装并启动请求处理器(RequestProcessor)
之后zk服务就开始对外提供服务

4.1.2.请求处理过程

客户端连接服务器,TCP3次握手成功后,
客户端和服务器还需要通过上层数据包的方式,进行一些协商,数据包统一用Packet封装.
这个过程会包含两个特殊的包交互:
1.第一个包负责会话的初始化,包含会话超时时间协商,会话id分配,zxid校验
2.第二个包负责进行鉴权,客户端通过ZooKeeper.addAuthInfo()添加自己的认证信息,向服务器发包,服务器进行认证,认证失败会关闭会话.

交互过程

除连接包和认证包由底层传输层处理外,客户端后续发来的操作请求,会分别经过:
1.PrepRequestProcessor
2.SyncRequestProcessor
3.FinalRequestProcessor
最终被服务器端处理成功后,回发响应.
一次请求过程大致如下图所示.

zk架构

4.1.3.请求处理器详解

下面分别介绍不同的请求处理器的职责:(先上图)

请求处理器
  1. PrepRequestProcessor(准备请求处理器):
    该处理器是一个单独的线程,当接收到ServerCnxnFactory发来的请求后,只是简单将请求加入到提交队列中,
    之后由线程逻辑一直循环消费队列并处理器请求.
    当前处理器最主要的职责,就是为写操作生成对应的事务对象,之后调用下一个处理器进行继续处理.
  2. SyncRequestProcessor(同步请求处理器):
    该处理器同样是一个单独线程,收到请求后,将请求对象添加到队列中,由线程逻辑一直循环消费队列,处理请求.
    当前处理的职责很单一,就是把请求中附带的事务对象序列化到磁盘当中,
    并定期进行事务文件的滚动(就像log4j的rollingFile一定,定期创建新的文件),
    以及保存快照操作(把当前内存中的DataTree和SessionMap序列化到磁盘文件中).
    处理器会切换执行两套逻辑,
    当请求队列中存在请求时,则会优先处理请求,把请求中的事务对象追加到日志文件流中,并把请求对象加入刷新队列中;
    当请求队列中没有要处理的请求时,则会执行刷新操作(flush),关闭所有打开的流文件,确保数据全部落盘,之后把刷新队列中的请求全部移交给下一个处理器进行处理.
  3. FinalRequestProcessor(最终请求处理器)
    该处理器是请求处理的终点,负责应用请求对象中的事务,变更内存中的数据结构,并最终给客户端响应结果.
    当前处理器不是线程,会占用前一个处理器线程的资源执行.

名词解释:
演变记录(ChangeRecord),负责记录某一次事务请求操作后某个ZNode节点的结果状态.
ZooKeeperServer内部有一个List和一个Map,List负责按需保存整个演变过程,而Map负责以path为key,记录节点的最终变更结果.
因为zk在生成事务之前会先做业务校验(比如,创建节点前需要先校验节点是否已经存在),校验通过后才会生成事务对象(事务对象一旦生成就一定会被应用到DataTree中),由于在业务校验时,可能有的已经生成的事务还没有被成功应用,而可能导致校验出错,所以需要借助ChangeRecord来暂存未应用的事务节点在应用之后的节点状态,
在业务校验时只需要校验Map中path对应的ChangeRecord状态即可,当获取ChangeRecord对象时,如果Map中不存在(当前path不存在中间态的事务),则默认返回ZNode当前状态的封装.

4.2.核心数据结构

4.2.1.DataTree

其中包含5个数据部分:

  1. 节点Map:key为数据节点的path,value为数据节点本身(DataNode)
  2. 临时会话Map:key为会话的sessionId,value为该会话创建的临时path的集合
  3. 监视管理器:负责保存客户端请求读操作时,注册的path监视,以及事件触发
  4. 路径查找树:用于保存受quota监控的所有的路径
  5. 访问控制列表Map,key为访问控制列表的id,其由一个内存级的自增id分配,value为对应的控制列表配置(List<ACL>)
    DataTree除了提供自身数据的操作能力以外,还提供了自身的序列化(内存数据持久化)和反序列化机制(磁盘数据加载).

注意:DataTree中只有节点Map和访问控制列表Map可以进行持久化操作,其他数据都是内存级的

public class DataTree {

    /**
     * node树(持久化)
     * key为path,value为ZNode节点
     */
    private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<>();
    /**
     * 临时节点列表
     * key为sessionId,value为path集合
     */
    private final ConcurrentMap<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>();

    /**
     * 数据变化监视管理器
     */
    private final WatchManager dataWatches = new WatchManager();
    /**
     * 子节点变化监视管理器
     */
    private final WatchManager childWatches = new WatchManager();

    /**
     * 路径查找树,用于path的快速检索.
     * 内部保存开启配额检查的所有路径
     */
    private final PathTrie pathTrie = new PathTrie();

    /**
     * 当前DataTree中包含List<ACL>的个数
     * 是一个自增id,每新增一个List<ACL>值,aclIndex自增给该值分配唯一id
     */
    private long aclIndex = 0;
    /**
     * 权限索引(持久化)
     * key为aclIndex,value为List<ACL>
     */
    private final Map<Long, List<ACL>> longKeyMap = new HashMap<>();

    /**
     * 权限索引
     * key为List<ACL>,value为aclIndex
     */
    private final Map<List<ACL>, Long> aclKeyMap = new HashMap<>();

    /**
     * 最大事务id,
     * DataTree每次处理器,都会更新此值
     */
    private volatile long lastProcessedZxid = 0;

}

4.2.2.ZKDatabase(简称zkdb)

zkdb比较简单,它负责同时管理DataTree的数据,已经所有会话的生命周期(ConcurrentHashMap<Long, Integer> sessionsWithTimeouts).
zkdb是外部进行数据操作的入口.
ZKDatabase.loadDatabase()可以完成数据从磁盘到内存的加载工作,
其内部通过调用FileTxnSnapLog.restore()完成反序列化和重建.

4.3.持久化及重建机制

4.3.1.概述

zk的持久化数据存储中,包含两种类型的文件,

注意:
zk在持久化快照时,依然会接收客户端请求并对DataTree进行变更操作,因此快照文件保存的数据并不能反应出任意时间点DataTree的准确状态.
不过不用担心,我们的最终目的是通过持久化的文件数据可以在内存中重建最新状态的DataTree.
我们已经记录了开始序列化快照时DataTree的最大zxid,我们只需要基于快照再重播zxid之后的所有事务记录,即可将DataTree恢复至最新状态.

4.3.2.SnapLog

该组件提供快照的序列化和反序列化,同时,我们可以从文件系统中查询当前最新的快照文件.
API如下:

public interface SnapShot extends Closeable {

    long deserialize(DataTree dataTree, ConcurrentMap<Long, Integer> sessions) throws IOException;

    void serialize(DataTree dataTree, Map<Long, Integer> sessions, File name) throws IOException;

    File findMostRecentSnapshot() throws IOException;
} 

4.3.3.TxnLog

该组件提供事务日志的序列化和反序列化,功能包括:

  1. 将事务对象序列化后追加到文件中
  2. 滚动日志,创建新的日志文件
  3. 提交,将当前打开的所有日志文件进行刷新并关闭,确保缓存数据全部落盘
  4. 将指定zxid后面的事务数据全部从文件中删除
  5. 创建一个迭代器,初始指向指定的zxid对应的事务记录,可向后迭代遍历
  6. 获取当前记录的事务中最大的zxid

API如下:

public interface TxnLog {

    boolean append(TxnHeader header, Record record) throws IOException;

    void rollLog() throws IOException;

    void commit() throws IOException;

    boolean truncate(long zxid) throws IOException;

    TxnIterator read(long zxid) throws IOException;
    
    long getLastLoggedZxid() throws IOException;
}

4.3.4.FileSnapTxnLog

该组件是SnapLog和TxnLog两个组件的门面,通过它可以完成根据快照文件和事务文件,重建内存数据最新状态的能力.

4.4.集群模式运行过程

4.4.1.概述

一个zk集群由一个leader节点和若干个follower节点构成.
集群正常运行的前提条件为:leader节点正常,且集群超过半数(包含leader)节点正常时,集群处于服务状态.
部分follower节点宕机,可自行恢复.
但是,如果leader节点宕机,或超过半数节点宕机,整个集群所有节点都会停止服务,进入选举状态.

集群状态

4.4.2.选举(FastLeaderElection)

在集群节点进行选举时,需要节点两两之间建立socket连接.
也就是说,每个节点都要通过ServetSocket进行端口监听,与此同时向其他节点发起socket连接.
但是节点之间没必要建立两条连接,因此zk规定:连接方向为serverId大的向serverId小的节点发起连接.
因此,假设一个5个节点组成的集群(n=5),则总共存在n*(n-1)/2=10条TCP通道.如下图:

节点连接

选举过程中,一个很重要的数据结构,就是投票(Vote),其用于描述某个节点的竞选信息.
节点保存哪个Vote对象,即代表支持哪个节点做leader.

public class Vote {
    /**
     * 节点id
     */
    private final long id;
    /**
     * 节点最大事务id
     */
    private final long zxid;
    /**
     * 节点竞选轮次
     */
    private final long electionEpoch;
    /**
     * 节点的服务轮次,
     * 该值持久化在快照目录的currentEpoch文件中
     */
    private final long peerEpoch;
    /**
     * 节点当前状态:
     * LOOKING:正在选举
     * FOLLOWING:正在追随
     * LEADING:正在领导
     * OBSERVING:正在观察
     */
    private final ServerState state;
}

在进行群首选举时,
首先,所有节点初始时都投自己(Vote=myServerid,myZxid,myPeerEpoch).
其次,需要确保大家都在相同的选举轮次进行投票,如果某个节点小于当前的选举轮次,则它需要重新初始化自己(丢弃自己收到的所有投票,重新投自己,广播自己的投票)
然后,在选举轮次一致的前提下,选举过程中,每个节点都在重复的做两件事,直到最终投票结束:

  1. 接收其他节点的投票,根据其他节点的状态,会走不同的逻辑处理逻辑
  2. 如果自己的票型变化,则广播自己的投票
选举流程图

投票组件的组件架构如下:

投票走向

4.4.2.集群初始化

4.4.2.1.内部通信数据结构

集群节点内部通信,均使用QuorumPacket传递数据,数据结构如下:

public class QuorumPacket implements Record {
    // 包类型
    private int type;
    // 事务id
    private long zxid;
    // 数据包附带的数据
    private byte[] data;
    // 认证信息,包含scheme(认证方案)和id(身份信息)
    private List<Id> authinfo;
}

4.4.2.2.初始化过程

初始化过程分为2个阶段:服务轮次(epoch)协商和数据同步

  1. 服务轮次协商(acceptedEpoch和currentEpoch)
    当选举结束后,QuorumPeer的ServerState会由looking转变为leading/following/observing.
    QuorumPeer根据自身状态,创建一个对应角色(Leader or Follower)的实例,来进行epoch协商.
    leader首先建立端口监听,等待follower向自己发起连接.
    连接建立后,通过内部数据结构QuorumPacket进行epoch协商,具体过程如下:
线程交互
  1. 数据同步
    leader会根据自己与follower之间zxid差异的情况,选择是使用补发事务记录的同步方式,还是快照加事务的同步方式.
数据同步

数据同步的具体过程如下:

数据同步线程交互

4.4.3.对外服务(请求处理器)

4.4.3.1.Server体系概述

在zk内部,由ZooKeeperServer(或它的子类)实例统一对客户端提供服务的,根据集群节点角色不同,会创建不同的子类实例.

不同的的ZooKeeperServer实例之间,最主要的区别在于对于请求处理器的组装不同,因此对于一个请求的处理逻辑也不同.

ZooKeeperServer类体系如下:

类图

4.4.3.2.LeaderZooKeeperServer

请求响应过程

4.4.3.3.FollowerZooKeeperServer:

请求响应过程

5.总结


上一篇下一篇

猜你喜欢

热点阅读