大数据,机器学习,人工智能技术干货

zk源码阅读37:ZooKeeperServer源码分析

2017-08-14  本文已影响1702人  赤子心_d709

摘要

前面针对server启动到选举leader进行了一个小结,现在进入leader和follower的启动交互过程,需要先讲ZooKeeperServer,
在之前源码阅读的25节里面带过了一部分,这里详细讲解ZooKeeperServer的源码

继承关系如下

ZooKeeperServer继承关系

本节主要讲解内容如下

父接口
  ServerStats.Provider
  SessionTracker.SessionExpirer
属性
内部类
  DataTreeBuilder && BasicDataTreeBuilder
  MissingSessionException
  ChangeRecord
  State
函数
  构造函数
  加载数据相关函数
  启动相关函数
  实现SessionTracker.SessionExpirer接口的函数
  client重连相关函数
  其他函数
思考

父接口

ServerStats.Provider

在源码阅读第24节讲解了,这里不赘述

SessionTracker.SessionExpirer

是SessionTracker的内部接口

    public static interface SessionExpirer {
        void expire(Session session);//过期某个session

        long getServerId();//获取serverId
    }

属性

如下图

ZooKeeperServer属性

除去log,jmx相关部分,源码如下

    public static final int DEFAULT_TICK_TIME = 3000;//默认最短周期
    protected int tickTime = DEFAULT_TICK_TIME;//周期时长
    /** value of -1 indicates unset, use default */
    protected int minSessionTimeout = -1;//默认最短会话超时时间
    /** value of -1 indicates unset, use default */
    protected int maxSessionTimeout = -1;//默认最长会话超时时间
    protected SessionTracker sessionTracker;//会化跟踪器
    private FileTxnSnapLog txnLogFactory = null;//事务,快照日志
    private ZKDatabase zkDb;//数据库
    private final AtomicLong hzxid = new AtomicLong(0);//即zxid
    public final static Exception ok = new Exception("No prob");
    protected RequestProcessor firstProcessor;//首个请求处理器
    protected volatile State state = State.INITIAL;//初始状态

    /**
     * This is the secret that we use to generate passwords, for the moment it
     * is more of a sanity check.
     */
    static final private long superSecret = 0XB3415C00L;//密码相关

    private final AtomicInteger requestsInProcess = new AtomicInteger(0);//正在处理的请求个数
    final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();//变更列表
    // this data structure must be accessed under the outstandingChanges lock
    final HashMap<String, ChangeRecord> outstandingChangesForPath =//变更的path到变更记录的map
        new HashMap<String, ChangeRecord>();
    
    private ServerCnxnFactory serverCnxnFactory;//连接工厂

    private final ServerStats serverStats;//服务器统计器
    private final ZooKeeperServerListener listener;//监听,处理服务器stop状态
    private ZooKeeperServerShutdownHandler zkShutdownHandler;//server的关闭处理器

ChangeRecord是ZooKeeperServer的内部类,下面会介绍
ServerStats,ZooKeeperServerListener都在25节的源码介绍过

内部类

内部类

DataTreeBuilder && BasicDataTreeBuilder

这个类并没有调用,不用管

    /**
     * The server delegates loading of the tree to an instance of the interface
     */
    public interface DataTreeBuilder {
        public DataTree build();
    }

    static public class BasicDataTreeBuilder implements DataTreeBuilder {
        public DataTree build() {
            return new DataTree();
        }
    }

MissingSessionException

定义异常

    public static class MissingSessionException extends IOException {
        private static final long serialVersionUID = 7467414635467261007L;

        public MissingSessionException(String msg) {
            super(msg);
        }
    }

ChangeRecord

这个数据结构为了促进PrepRequestProcessor以及FinalRequestProcessor的信息共享,讲到调用链的时候再讲。

    static class ChangeRecord {
        ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
                List<ACL> acl) {
            this.zxid = zxid;
            this.path = path;
            this.stat = stat;
            this.childCount = childCount;
            this.acl = acl;
        }

        long zxid;

        String path;//路径
        //持久化状态
        StatPersisted stat; /* Make sure to create a new object when changing */

        int childCount;

        List<ACL> acl; /* Make sure to create a new object when changing */

        @SuppressWarnings("unchecked")
        ChangeRecord duplicate(long zxid) {//复制一份
            StatPersisted stat = new StatPersisted();
            if (this.stat != null) {
                DataTree.copyStatPersisted(this.stat, stat);
            }
            return new ChangeRecord(zxid, path, stat, childCount,
                    acl == null ? new ArrayList<ACL>() : new ArrayList(acl));
        }
    }

其中,StatPersisted在源码阅读7中讲DataNode的时候讲过了

State

描述当前server所处的状态

    protected enum State {
        INITIAL, RUNNING, SHUTDOWN, ERROR;
    }

函数

构造函数

这里列举处两个底层调用的构造函数

    /**
     * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
     * methods to prepare the instance (eg datadir, datalogdir, ticktime, 
     * builder, etc...)
     * 
     * @throws IOException
     */
    public ZooKeeperServer() {//利用set方法设置其他属性
        serverStats = new ServerStats(this);
        listener = new ZooKeeperServerListenerImpl(this);
    }
    
    /**
     * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
     * actually start listening for clients until run() is invoked.
     * 
     * @param dataDir the directory to put the data
     */
    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
            int minSessionTimeout, int maxSessionTimeout,
            DataTreeBuilder treeBuilder, ZKDatabase zkDb) {//配置各种属性,等run方法被调用才真正开始接收clients
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        this.minSessionTimeout = minSessionTimeout;
        this.maxSessionTimeout = maxSessionTimeout;

        listener = new ZooKeeperServerListenerImpl(this);

        LOG.info("Created server with tickTime " + tickTime
                + " minSessionTimeout " + getMinSessionTimeout()
                + " maxSessionTimeout " + getMaxSessionTimeout()
                + " datadir " + txnLogFactory.getDataDir()
                + " snapdir " + txnLogFactory.getSnapDir());
    }

加载数据相关

启动涉及到db的数据加载,这里也有集群和单机两种,调用顺序为

集群调用顺序:
  Leader#lead
  ZooKeeperServer#loadData
单机调用顺序:
  ServerCnxnFactory#startup
  ZooKeeperServer#startdata
  ZooKeeperServer#loadData

主要是集群的时候,server选完了leader,由leader才能调用数据加载loadData

下面按照单机版startdata函数展开

startdata

初始化zkDb完成数据加载

    public void startdata() 
    throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);//没有就构造
        }  
        if (!zkDb.isInitialized()) {//没有初始化就加载数据,完成初始化
            loadData();
        }
    }

loadData

恢复session和数据,单机版启动或者集群版leader选举之后调用lead方法时,会调用该方法。
主要完成设置zxid以及把无效的session给kill掉的工作

    /**
     *  Restore sessions and data
     */
    public void loadData() throws IOException, InterruptedException {//设置zxid,再从db中把旧的session给kill掉
        /*
         * When a new leader starts executing Leader#lead, it 
         * invokes this method. The database, however, has been
         * initialized before running leader election so that
         * the server could pick its zxid for its initial vote.
         * It does it by invoking QuorumPeer#getLastLoggedZxid.
         * Consequently, we don't need to initialize it once more
         * and avoid the penalty of loading it a second time. Not 
         * reloading it is particularly important for applications
         * that host a large database.
         * 
         * The following if block checks whether the database has
         * been initialized or not. Note that this method is
         * invoked by at least one other method: 
         * ZooKeeperServer#startdata.
         *  
         * See ZOOKEEPER-1642 for more detail.
         */
        if(zkDb.isInitialized()){
            setZxid(zkDb.getDataTreeLastProcessedZxid());//设置当前server的zxid
        }
        else {
            setZxid(zkDb.loadDataBase());//设置当前server的zxid
        }
        
        // Clean up dead sessions
        LinkedList<Long> deadSessions = new LinkedList<Long>();
        for (Long session : zkDb.getSessions()) {//获取临时会话记录
            if (zkDb.getSessionWithTimeOuts().get(session) == null) {//如果会话已经被tracker超时检测给清除掉了,应该是已经处理检测过期,但是异步发送请求还未完成的情况
                deadSessions.add(session);
            }
        }
        zkDb.setDataTreeInit(true);//设置标志位
        for (long session : deadSessions) {
            // XXX: Is lastProcessedZxid really the best thing to use?
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }
    }

这里注意,为什么需要干这件事情,在下面思考中会说

里面调用了setZxid(不展开)以及killSession函数

killSession

清除db中临时会话记录,会话跟踪器也清除记录

    protected void killSession(long sessionId, long zxid) {
        zkDb.killSession(sessionId, zxid);//清除db中相关临时会话记录
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                         "ZooKeeperServer --- killSession: 0x"
                    + Long.toHexString(sessionId));
        }
        if (sessionTracker != null) {
            sessionTracker.removeSession(sessionId);//会化跟踪器清除记录
        }
    }

启动相关

入口是ZooKeeperServer#startup,zkServer都是在上述加载了db的数据之后,调用startup来完成启动

startup

启动的入口函数

    public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();//创建会化跟踪器
        }
        startSessionTracker();//启动会话管理
        setupRequestProcessors();//初始化请求处理链路

        registerJMX();//注册jmx

        setState(State.RUNNING);//提供服务
        notifyAll();
    }

调用了createSessionTracker等函数,介绍如下

createSessionTracker & startSessionTracker

createSessionTracker 完成会话跟踪器的创建

    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                tickTime, 1, getZooKeeperServerListener());//1是默认单机版实现的sid
    }

这里是默认的单机版实现,在集群版不同的角色有不同的实现,主要是参数sid不会传1,而是配置中的sid

startSessionTracker 启动会话跟踪器

    protected void startSessionTracker() {
        ((SessionTrackerImpl)sessionTracker).start();
    }

setState

设置服务器运行状态,对于ERROR和SHUTDOWN的state,进行对应的操作

    protected void setState(State state) {//根据zkShutdownHandler处理state
        this.state = state;
        // Notify server state changes to the registered shutdown handler, if any.
        if (zkShutdownHandler != null) {
            zkShutdownHandler.handle(state);//如果是错误的state就进行关闭操作
        } else {
            LOG.error("ZKShutdownHandler is not registered, so ZooKeeper server "
                    + "won't take any action on ERROR or SHUTDOWN server state changes");
        }
    }

源码阅读25:服务器异常报警,关闭机制讲过,这里不赘述

setupRequestProcessors

安装请求处理链路,是PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor顺序
具体在后面请求处理链路再讲

    protected void setupRequestProcessors() {//安装请求处理链路
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);//链路是PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor顺序
        ((PrepRequestProcessor)firstProcessor).start();//启动链路
    }

实现SessionTracker.SessionExpirer接口的函数

两个函数getServerId和expire

    public long getServerId() {//默认实现,不同角色的实现返回值不同
        return 0;
    }

    public void expire(Session session) {//过期某个session
        long sessionId = session.getSessionId();
        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
                + ", timeout of " + session.getTimeout() + "ms exceeded");
        close(sessionId);
    }

close函数在下面会讲

client重连相关

processConnectRequest用于处理client的连接请求,不展开
值得注意的地方是重连的调用

处理client重连

展开如下

reopenSession

重连的核心函数

    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
            int sessionTimeout) throws IOException {
        if (!checkPasswd(sessionId, passwd)) {//如果密码不对
            finishSessionInit(cnxn, false);//完成初始化,传入valid值为false
        } else {
            revalidateSession(cnxn, sessionId, sessionTimeout);//如果密码正确,判断会话跟踪器的记录
        }
    }

checkPasswd

验证sessionId和传递来的密码的正确性

    protected boolean checkPasswd(long sessionId, byte[] passwd) {
        return sessionId != 0
                && Arrays.equals(passwd, generatePasswd(sessionId));
    }

generatePasswd

根据sessionId生成密码

    byte[] generatePasswd(long id) {
        Random r = new Random(id ^ superSecret);
        byte p[] = new byte[16];
        r.nextBytes(p);
        return p;
    }

revalidateSession

在会话跟踪器SessionTracker中判断会话是否还有小

    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
            int sessionTimeout) throws IOException {//验证会话跟踪器是否有对应记录,获取结果,再调用finishSessionInit
        boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                                     "Session 0x" + Long.toHexString(sessionId) +
                    " is valid: " + rc);
        }
        finishSessionInit(cnxn, rc);
    }

finishSessionInit

完成会话初始化,根据参数valid代表认证通过与否,用来判断server是接收连接请求,还是发出closeConn的请求,不展开,重要部分如下

valid不同的处理方式

其他函数

除去的get,set,jmx,shutdown相关函数,剩下重要函数如下

部分函数列举如下

getNextZxid

获取下一个server的zxid,调用方需要确保控制并发顺序

    long getNextZxid() {
        return hzxid.incrementAndGet();
    }

close

上面ZooKeeperServer#expire调用了close函数,介绍如下
该函数用于提交一个 关闭某个sessionId 的请求

    private void close(long sessionId) {
        submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
    }

submitRequest

这里有两个函数

    private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
            int xid, ByteBuffer bb, List<Id> authInfo) {
        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);//生成request
        submitRequest(si);//提交请求
    }
    
    public void submitRequest(Request si) {
        if (firstProcessor == null) {//等待第一个处理器构造完成
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);//更新SessionTracker相关统计
            boolean validpacket = Request.isValid(si.type);//验证Request的OpCode
            if (validpacket) {
                firstProcessor.processRequest(si);//支持的type就调用第一个处理链来处理
                if (si.cnxn != null) {
                    incInProcess();//正在处理的请求个数 +1
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

剩下的

函数 备注
public void dumpConf(PrintWriter pwriter) ServerCnxn接收"conf"的cmd时,输出配置到PrintWriter
void removeCnxn(ServerCnxn cnxn) db中删除这个ServerCnxn(清除Watcher记录)
public void takeSnapshot() 记录快照,把sessions和dataTree信息保存至快照
void touch(ServerCnxn cnxn) touch一个ServerCnxn的sessionId,更新SessionTracker对应统计,在submitRequest即提交请求时调用
processPacket 处理普通请求的函数,除了sasl以及auth以外,大部分进入submitRequest
dumpEphemerals ServerCnxn接收"dump"的cmd时,把临时节点信息输出到PrintWriter
processSasl 处理请求中是sasl的部分
processTxn 调用链处理完之后,最终处理事务请求

思考

ZooKeeperServer#loadData为什么会出现数据不一致而需要清理session

之前在源码21节 会话管理中讲解了会话清除,在sessionTracker的记录是马上清除的,而DateTree中临时会话的清除是通过调用链一步步来的,也就是说两个步骤不是同步的,所以如果中间服务器状态改变了,会出现不一致的情况

属性requestsInProcess什么时候变化

requestsInProcess代表正在处理的请求个数

增加时,调用链
  ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)调用
  ZooKeeperServer#incInProcess
减少时,调用链
  FinalRequestProcessor#processRequest
  ZooKeeperServer#decInProcess

就是说发出请求时,requestsInProcess+1,最后完成请求时,requestsInProcess-1.涉及到请求处理链。

每个连接的密码是怎么生成的

ZooKeeperServer#checkPasswd调用
ZooKeeperServer#generatePasswd

就是sessionId要和sessionId^superSecret生成的第一个随机数相匹配即可
密码不是client端设置的,是根据sessionId生成的

client重连的机制是怎样的

ZooKeeperServer#processConnectRequest 里面调用reopenSession中
在上面已经讲了,核心就是

1.先看密码对不对
2.再看SessionTracker中是否还有效
3.都通过了才允许重连

问题

ChangeRecord如何促进PrepRequestProcessor以及FinalRequestProcessor的信息共享

这里还没有深入看,先存疑

吐槽

需要思考的细节太多了

比如思考中提到的loadData为什么会出现数据不一致,属于某种异常情况的处理

client重连的代码

为什么不放到另外一个类里面去

上一篇下一篇

猜你喜欢

热点阅读