程序员

ZooKeeper源码分析之Session

2020-10-27  本文已影响0人  兽怪海北

session在zookeeper中是一个不能避开的概念,临时节点(包括临时普通节点和临时顺序节点)都是与session关联的,临时节点将在session超期后被删除。本篇我们来看一下session的创建与销毁,过期session的处理等内容。

session创建

客户端的连接请求到达服务器后最早由processConnectRequest处理,我们分段来看一下processConnectRequest的处理流程。

ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");

反序列化connect请求,connect请求参数如下:

private int protocolVersion;
private long lastZxidSeen;
private int timeOut;
private long sessionId;
private byte[] passwd;
字段名 含义
protocolVersion 协议版本号
lastZxidSeen 客户端过去看到的最大的zxid
timeOut session的超时时间
sessionId session的id,新连接传0,重连传之前服务器返回的sessionid
passwd session对应的密码,由服务器返回给客户端,重连时需要传递该字段

判断是否是readonly请求,ReadOnlyZooKeeperServer只处理readonly的连接,ReadOnlyZooKeeperServer在前面的集群启动中我们讲过,就是使得服务器在选举过程中或者发生分区时依旧可以读数据。

boolean readOnly = false;
try {
    readOnly = bia.readBool("readOnly");
    cnxn.isOldClient = false;
} catch (IOException e) {
}
if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
    String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
    throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}

判断客户端最后看到的zxid是否大于我们的最后处理zxid,如果是则关闭连接,让客户端尝试连接有最新数据的其它服务器。

if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
    String msg = "Refusing session request for client "
                 + cnxn.getRemoteSocketAddress()
                 + " as it has seen zxid 0x"
                 + Long.toHexString(connReq.getLastZxidSeen())
                 + " our last zxid is 0x"
                 + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                 + " client must try another server";
    throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}

读取session的超时值,校验并规格化:

int sessionTimeout = connReq.getTimeOut();
byte[] passwd = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
    sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
    sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);

创建或重连session:

if (sessionId == 0) {
    long id = createSession(cnxn, passwd, sessionTimeout);
} else {
    long clientSessionId = connReq.getSessionId();
    if (serverCnxnFactory != null) {
        serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
    }
    if (secureServerCnxnFactory != null) {
        secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
    }
    cnxn.setSessionId(sessionId);
    reopenSession(cnxn, sessionId, passwd, sessionTimeout);
    ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
}

重连session我们放到下一节分析,我们来看一下createSession方法的逻辑,代码不长,我们就直接一次性放出来。

long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    if (passwd == null) {
        passwd = new byte[0];
    }
    long sessionId = sessionTracker.createSession(timeout);
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
    submitRequest(si);
    return sessionId;
}
  1. sessionTracker.createSession(timeout),创建session并拿到sessionid。SessionTracker是一个接口,有4个实现类。

    类名 用途
    SessionTrackerImpl 单机使用。
    LocalSessionTracker 继承了SessionTrackerImpl,给UpgradeableSessionTracker用,UpgradeableSessionTracker包含一个LocalSessionTracker。
    UpgradeableSessionTracker LeaderSessionTracker和LearnerSessionTracker的抽象基类。
    LeaderSessionTracker leader使用。
    LearnerSessionTracker follower和observer使用。
  2. 生成session密码。

  3. 提交session创建请求。

我们来看一下单机模式下sessionTracker.createSession(timeout)做了些什么:

public long createSession(int sessionTimeout) {
    long sessionId = nextSessionId.getAndIncrement();
    trackSession(sessionId, sessionTimeout);
    return sessionId;
}
public synchronized boolean trackSession(long id, int sessionTimeout) {
    boolean added = false;
    SessionImpl session = sessionsById.get(id);
    if (session == null) {
        session = new SessionImpl(id, sessionTimeout);
    }
    SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
    if (existedSession != null) {
        session = existedSession;
    } else {
        added = true;
    }
    updateSessionExpiry(session, sessionTimeout);
    return added;
}

首先得到下一个sessionid,然后将这个sessionid纳入管理。

来看一下leader模式下sessionTracker.createSession(timeout)做了些什么:

public long createSession(int sessionTimeout) {
    if (localSessionsEnabled) {
        return localSessionTracker.createSession(sessionTimeout);
    }
    return globalSessionTracker.createSession(sessionTimeout);
}

localSessionsEnabled由配置文件决定(zoo.cfg),这个选项决定了UpgradeableSessionTracker是使用LocalSessionTracker还是SessionTrackerImpl来实现。使用LocalSessionTracker的情况下,创建session时不会扩散到整个集群,而在需要升级时,根据localSessionsUpgradingEnabled的值确定是否需要将本地session升级为全局session扩散到整个集群。

再来看一下follower或者observer模式下sessionTracker.createSession(timeout)做了些什么:

public long createSession(int sessionTimeout) {
    if (localSessionsEnabled) {
        return localSessionTracker.createSession(sessionTimeout);
    }
    return nextSessionId.getAndIncrement();
}

判断localSessionsEnabled是否开启,如果开启了,创建一个本地session,如果没有,创建一个全局session。

接下来我们看session创建请求在processor链的处理过程。

先看单机:

  1. 首先是PrepRequestProcessor:

    int to = request.request.getInt();
    request.setTxn(new CreateSessionTxn(to));
    request.request.rewind();
    zks.sessionTracker.trackSession(request.sessionId, to);
    zks.setOwner(request.sessionId, request.getOwner());
    

    很简单,调用SessionTracker的trackSession方法。

  2. 再看FinalRequestProcessor:

    applyRequest

    ProcessTxnResult rc = zks.processTxn(request);
    

    processTxn

    processTxnForSessionEvents(request, hdr, request.getTxn());
    

    processTxnForSessionEvents

    private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
        int opCode = (request == null) ? hdr.getType() : request.type;
        long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
        if (opCode == OpCode.createSession) {
            if (hdr != null && txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                sessionTracker.commitSession(sessionId, cst.getTimeOut());
            } else if (request == null || !request.isLocalSession()) {
                LOG.warn("*****>>>>> Got {} {}",  txn.getClass(), txn.toString());
            }
        } else if (opCode == OpCode.closeSession) {
            sessionTracker.removeSession(sessionId);
        }
    }
    

很简单,调用SessionTracker的commitSession方法。

case OpCode.createSession: {
    lastOp = "SESS";
    updateStats(request, lastOp, lastZxid);
    zks.finishSessionInit(request.cnxn, true);
    return;
}

zks.finishSessionInit给客户端返回正确的响应,包括session超时值,sessionid和session对应的密码。

再看leader:

分两种情况,开启了localSessionsEnabled和没开启localSessionsEnabled。

先看开启了localSessionsEnabled的情况:

  1. 首先是PrepRequestProcessor:

    pRequestHelper

    case OpCode.createSession:
    case OpCode.closeSession:
        if (!request.isLocalSession()) {
            pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
        }
        break;
    

    如果是客户端发来的请求,不做任何处理。如果是follower或者observer发来的请求,则生成hdr头和txn记录,调用SessionTracker的trackSession方法。

  2. 然后是ProposalRequestProcessor:

    processRequest

    if (shouldForwardToNextProcessor(request)) {
        nextProcessor.processRequest(request);
    }
    if (request.getHdr() != null) {
        try {
            zks.getLeader().propose(request);
        } catch (XidRolloverException e) {
            throw new RequestProcessorException(e.getMessage(), e);
        }
        syncProcessor.processRequest(request);
    }
    

对于客户端发来的请求,hdr头为空,什么也不做,直接交给下一个processor处理。如果是follower或者observer发来的请求则提交提案,等待半数节点完成session创建。

  1. 最后是FinalRequestProcessor:

    与单机模式下的情况完全相同。

再看没开启localSessionsEnabled的情况:

  1. 首先是PrepRequestProcessor:

    pRequestHelper

    case OpCode.createSession:
    case OpCode.closeSession:
        if (!request.isLocalSession()) {
            pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
        }
        break;
    

    pRequest2Txn

    int to = request.request.getInt();
    request.setTxn(new CreateSessionTxn(to));
    request.request.rewind();
    zks.sessionTracker.trackSession(request.sessionId, to);
    zks.setOwner(request.sessionId, request.getOwner());
    

    生成hdr头和txn记录,调用SessionTracker的trackSession方法。

  2. 然后是ProposalRequestProcessor:

    processRequest

    if (shouldForwardToNextProcessor(request)) {
        nextProcessor.processRequest(request);
    }
    if (request.getHdr() != null) {
        try {
            zks.getLeader().propose(request);
        } catch (XidRolloverException e) {
            throw new RequestProcessorException(e.getMessage(), e);
        }
        syncProcessor.processRequest(request);
    }
    

提交提案,等待半数节点完成session创建。

  1. 最后是FinalRequestProcessor:

    与单机模式下的情况完全相同。

然后看follower:

分两种情况,开启了localSessionsEnabled和没开启localSessionsEnabled。

先看开启了localSessionsEnabled的情况:

  1. 首先是FollowerRequestProcessor:

    processRequest

    upgradeRequest = zks.checkUpgradeSession(request);
    if (upgradeRequest != null) {
        queuedRequests.add(upgradeRequest);
    }
    

    如果需要升级session,发送createSession请求来升级session。

    run

    case OpCode.createSession:
    case OpCode.closeSession:
        if (!request.isLocalSession()) {
            zks.getFollower().request(request);
        }
        break;
    

    如果是升级session的请求,则将请求转给leader处理。如果是客户端发来的请求,不做任何处理。

  2. 最后是FinalRequestProcessor:

    与单机模式下的情况完全相同。

再看没开启localSessionsEnabled的情况:

  1. 首先是FollowerRequestProcessor:

    run

    case OpCode.createSession:
    case OpCode.closeSession:
        if (!request.isLocalSession()) {
            zks.getFollower().request(request);
        }
        break;
    

    将请求转给leader处理。

  2. 最后是FinalRequestProcessor:

    与单机模式下的情况完全相同。

observer的过程跟follower差不多,就不分析了。

大家可能会疑惑,本地session的id是不是会跟全局session的id冲突,比如客户端连接了服务器1创建了一个session,另一个客户端连接服务器2创建了一个session,这两个session的id是不是会冲突。实际上zookeeper解决这个问题的方法是每个server根据server的id不同,创建的session的起始值也不一样,所以不会冲突。这个初始值还与当前时间有关,这样来避免重启后的sessionid(重启后会从数据库快照和事物日志中重建session,也就是说session实际上也是持久化的)与重启前的sessionid冲突,代码如下:

public static long initializeNextSessionId(long id) {
    long nextSid;
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    nextSid = nextSid | (id << 56);
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
        ++nextSid;  // this is an unlikely edge case, but check it just in case
    }
    return nextSid;
}
session重连

入口是:

public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
    if (checkPasswd(sessionId, passwd)) {
        revalidateSession(cnxn, sessionId, sessionTimeout);
    } else {
        LOG.warn(
            "Incorrect password from {} for session 0x{}",
            cnxn.getRemoteSocketAddress(),
            Long.toHexString(sessionId));
        finishSessionInit(cnxn, false);
    }
}

逻辑很简单,检查session对应的密码是否正确,若不正确,给客户端返回错误。若正确,调用revalidateSession。

protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
    boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
    finishSessionInit(cnxn, rc);
}

调用sessionTracker.touchSession,根据结果给客户端返回正确或错误。touchSession返回session是否没过期,如果没过期还会更新过期时间。

session关闭:

客户端正常关闭时发送closeSession包,我们来看一下closeSession包的处理过程。

先看单机:

  1. 首先是PrepRequestProcessor:

    if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
        request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
    }
    zks.sessionTracker.setSessionClosing(request.sessionId);
    

    如果开启了zookeeper.closeSessionTxn.enabled,则设置tnx为所有需要删除的临时节点的路径,默认开启。设置该session的状态为正在关闭。

  2. 最后是FinalRequestProcessor

    applyRequest

    ProcessTxnResult rc = zks.processTxn(request);
    if (request.type == OpCode.closeSession && connClosedByClient(request)) {
        if (closeSession(zks.serverCnxnFactory, request.sessionId)
            || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
            return rc;
        }
    }
    

    ZooKeeperServer的processTnx方法

    sessionTracker.removeSession(sessionId);
    if (txn != null) {
        killSession(sessionId, header.getZxid(),
                ephemerals.remove(sessionId),
                ((CloseSessionTxn) txn).getPaths2Delete());
    } else {
        killSession(sessionId, header.getZxid());
    }
    

    从sessionTracker中移除session,并调用killSession方法

    void killSession(long session, long zxid, Set<String> paths2DeleteLocal,
            List<String> paths2DeleteInTxn) {
        if (paths2DeleteInTxn != null) {
            deleteNodes(session, zxid, paths2DeleteInTxn);
        }
        if (paths2DeleteLocal == null) {
            return;
        }
        if (paths2DeleteInTxn != null) {
            for (String path: paths2DeleteInTxn) {
                paths2DeleteLocal.remove(path);
            }
            if (!paths2DeleteLocal.isEmpty()) {
                LOG.warn(
                    "Unexpected extra paths under session {} which are not in txn 0x{}",
                    paths2DeleteLocal,
                    Long.toHexString(zxid));
            }
        }
        deleteNodes(session, zxid, paths2DeleteLocal);
    }
    

    删除session关联的临时节点

    processRequest

    if (request.type == OpCode.closeSession) {
        cnxn.sendCloseSession();
    }
    

    关闭与客户端的连接。

再看其它:

其它情况与单机差不多,不同的是如果是一个本地session,closeSession的请求只需要在本地做,且不需要清理临时节点,不需要扩散到整个集群,因为如果session关联了临时节点,该session会被升级为全局session。如果是全局session,closeSession的请求需要扩散到整个集群。

session清理

入口是SessionTrackerImpl的run方法:

public void run() {
    try {
        while (running) {
            long waitTime = sessionExpiryQueue.getWaitTime();
            if (waitTime > 0) {
                Thread.sleep(waitTime);
                continue;
            }
            for (SessionImpl s : sessionExpiryQueue.poll()) {
                ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                setSessionClosing(s.sessionId);
                expirer.expire(s);
            }
        }
    } catch (InterruptedException e) {
        handleException(this.getName(), e);
    }
}

sessionExpiryQueue是一个ExpiryQueue,expire的间隔由zoo.cfg中的tickTime决定,默认值是3000。

ExpiryQueue的实现比较精妙,主要有三个接口方法update,getWaitTime和poll,update往队列中放,getWaitTime获取到下次执行需要等待的时间,poll获取本次需要过期的session的集合。服务器每次收到一个请求(包括心跳请求)都会调用SessionTracker的touchSession方法,touchSession会调用update方法。

public Long update(E elem, int timeout) {
    Long prevExpiryTime = elemMap.get(elem);
    long now = Time.currentElapsedTime();
    Long newExpiryTime = roundToNextInterval(now + timeout);
    if (newExpiryTime.equals(prevExpiryTime)) {
        return null;
    }
    Set<E> set = expiryMap.get(newExpiryTime);
    if (set == null) {
        set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
        Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
    set.add(elem);
    prevExpiryTime = elemMap.put(elem, newExpiryTime);
    if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
        Set<E> prevSet = expiryMap.get(prevExpiryTime);
        if (prevSet != null) {
            prevSet.remove(elem);
        }
    }
    return newExpiryTime;
}

首先判断新过期的时间是否与之前计算的时间相同,如果相同直接返回。从expiryMap中获取需要在newExpiryTime过期的集合,如果没有则新建一个,将元素放入该集合中,然后将元素从旧集合中移除。roundToNextInterval以expirationInterval(tickTime)向上取整,比如传入时间是1500,expirationInterval是3000,则计算出的结果是3000。

public long getWaitTime() {
    long now = Time.currentElapsedTime();
    long expirationTime = nextExpirationTime.get();
    return now < expirationTime ? (expirationTime - now) : 0L;
}

获取到下次过期时间的间隔。

public Set<E> poll() {
    long now = Time.currentElapsedTime();
    long expirationTime = nextExpirationTime.get();
    if (now < expirationTime) {
        return Collections.emptySet();
    }
    Set<E> set = null;
    long newExpirationTime = expirationTime + expirationInterval;
    if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
        set = expiryMap.remove(expirationTime);
    }
    if (set == null) {
        return Collections.emptySet();
    }
    return set;
}

拉去下一个过期的集合,并更新nextExpirationTime。

继续看session清理,setSessionClosing(s.sessionId)将session状态置为关闭中,expirer.expire(s)将session过期的真正逻辑。我们来看expirer.expire(s)做了什么:

public void expire(Session session) {
    long sessionId = session.getSessionId();
    close(sessionId);
}

然后就是跟正常客户端发来的关闭session的请求一样处理。

值得注意的是,follower收到客户端的非写请求后不会转发到leader,而全局session的超时清理是由leader处理的,如果不做任何处理,客户端的非写操作(包括ping)leader都感知不到,就会导致全局session被意外清理。ZooKeeper的解决方式是LearnerSessionTracker维护了一个touchTable,在收到leader发来的PING请求时(leader发送PING请求的间隔大概是expirationInterval的一半),为touchTable的所有元素发送ping请求,然后清空touchTable。

上一篇下一篇

猜你喜欢

热点阅读