大数据Paxos分布式技术

6. PhxPaxos源码分析之归档机制

2017-11-17  本文已影响139人  随安居士

目录
1. PhxPaxos源码分析之关于PhxPaxos
2. PhxPaxos分析之网络基础部件
3. PhxPaxos源码分析之Proposer、Acceptor
4. PhxPaxos源码分析之Learner
5. PhxPaxos源码分析之状态机
6. PhxPaxos源码分析之归档机制
7. PhxPaxos源码分析之整体架构


6.1 基本概念

上一节我们讲解了PhxPaxos的状态机,各种场景下可以定制不同的状态机,每个状态机独立消费一个paxos log以驱动业务状态变更。

状态机消费完paxos log之后,paxos log是否可以删除呢?答案是不行。有如下几个原因:

  1. 某些paxos节点可能由于网络等原因落后于其他节点,需要学习现有的paxos log。
  2. 业务消费完paxos log之后,可能由于重启等原因出现数据丢失,需要通过paxos log做重演(replay)。

那什么时候可以删除paxos log呢?答案是不知道。因为某个节点可能永远处于离线状态,这时候必须保留从最初到现在所有的paxos log。但另一方面,如果数据不删除将无限增长,这是无法做到的。

PhxPaxos因此引入了Checkpoint机制,关于该机制的详细描述请参见《状态机Checkpoint详解》。这里简要说明如下:

  1. 一个Checkpoint代表着一份某一时刻被固化下来的状态机数据,它通过sm.h下的StateMachine::GetCheckpointInstanceID()函数反馈它的精确时刻。
  2. 每次启动replay时,只需要从GetCheckpointInstanceID所指向PaxosLog位置开始而不是从0开始。
  3. Node::SetHoldPaxosLogCount()控制需要保留多少在StateMachine::GetCheckpointInstanceID()之前的PaxosLog。
  4. 保留一定数量的Paxos log的目的在于,如果其他节点数据不对齐,可以通过保留的这部分paxos log完成对齐,而不需要Checkpoint数据介入。
  5. 如果对齐数据已经被删除,这时需要Checkpoint数据传输。

6.2 代码设计

Checkpoint机制相关类图如下:


Checkpoint机制

参照上一节讲的功能及类图补充说明如下:

6.3 状态机(State Machine)

再来看StateMachine接口,这次我们重点关注Checkpoint相关接口。

    class StateMachine
    {
    public:

        virtual const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const;

        virtual int LockCheckpointState();
        virtual int GetCheckpointState(const int iGroupIdx, std::string& sDirPath,
                                       std::vector<std::string>& vecFileList);
        virtual void UnLockCheckpointState();
        virtual int LoadCheckpointState(const int iGroupIdx, const std::string& sCheckpointTmpFileDirPath,
                                        const std::vector<std::string>& vecFileList, const uint64_t llCheckpointInstanceID);

        virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
                                          const std::string& sPaxosValue);
    };

接口说明如下:

  • GetCheckpointInstanceID(const int iGroupIdx)
    Checkpoint所指向的最大的Instance ID,在此之前的paxos log数据状态机已经不需要了。
  • StateMachine::LockCheckpointState()
    用于开发者锁定状态机Checkpoint数据,这个锁定的意思是指这份数据文件不能被修改,移动和删除,因为接下来PhxPaxos就要将这些文件发送给其他节点,而如果这个过程中出现了修改,则发送的数据可能乱掉。
  • StateMachine::GetCheckpointState()
    Phxpaxos获取Checkpoint的文件列表,从而将文件发送给其他节点。
  • StateMachine::UnLockCheckpointState()
    当PhxPaxos发送Checkpoint到其他节点完成后,会调用的解锁函数,解除对开发者的状态机Checkpoint数据的锁定。
  • StateMachine::LoadCheckpointState()
    当一个节点获得来自其他节点的Checkpoint数据时,会调用这个函数,将这份数据交由开发者进行处理(开发者往往要做的事情就是将这份Checkpoint数据覆盖当前节点的数据),当调用此函数完成后,PhxPaxos将会进行进程自杀操作,通过重启来完成一个新Checkpoint数据的启动。
  • ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, const std::string& sPaxosValue)
    paxos log归档的replay接口。

6.4 Replayer

Replayer是一个独立的线程,负责将选中的提案值做Checkpoint操作。实现逻辑非常简单,读取本机的Checkpoint Instance ID信息,定时和Max Chosen Instance ID比较,如果Checkpoint落后于Max Chosen Instance ID,通过调用状态机的ExecuteForCheckpoint进行重演。

    void Replayer :: run()
    {
        PLGHead("Checkpoint.Replayer [START]");
        uint64_t llInstanceID = m_poSMFac->GetCheckpointInstanceID(m_poConfig->GetMyGroupIdx()) + 1;

        while (true)
        {
            if (m_bIsEnd)
            {
                PLGHead("Checkpoint.Replayer [END]");
                return;
            }

            if (!m_bCanrun)
            {
                //PLGImp("Pausing, sleep");
                m_bIsPaused = true;
                Time::MsSleep(1000);
                continue;
            }
            //本节点的所有提案值已全部重演
            if (llInstanceID >= m_poCheckpointMgr->GetMaxChosenInstanceID())
            {
                //PLGImp("now maxchosen instanceid %lu small than excute instanceid %lu, wait",
                //m_poCheckpointMgr->GetMaxChosenInstanceID(), llInstanceID);
                Time::MsSleep(1000);
                continue;
            }
            //重演
            bool bPlayRet = PlayOne(llInstanceID);

            if (bPlayRet)
            {
                PLGImp("Play one done, instanceid %lu", llInstanceID);
                llInstanceID++;
            }
            else
            {
                PLGErr("Play one fail, instanceid %lu", llInstanceID);
                Time::MsSleep(500);
            }
        }
    }

PlayOne实现逻辑如下:

    bool Replayer :: PlayOne(const uint64_t llInstanceID)
    {
        //读取数据库中的paxos log
        AcceptorStateData oState;
        int ret = m_oPaxosLog.ReadState(m_poConfig->GetMyGroupIdx(), llInstanceID, oState);

        if (ret != 0)
        {
            return false;
        }
        //调用状态机的ExecuteForCheckpoint
        bool bExecuteRet = m_poSMFac->ExecuteForCheckpoint(
                               m_poConfig->GetMyGroupIdx(), llInstanceID, oState.acceptedvalue());

        if (!bExecuteRet)
        {
            PLGErr("Checkpoint sm excute fail, instanceid %lu", llInstanceID);
        }

        return bExecuteRet;
    }

6.5 Paxos log清理(Clearner)

在PhxPaxos中,每个Group启动一个Clearner线程清理本Group的paxos log。在Group中,instance id是不断递增的,每个instance id对应一个paxos log,当加入Checkpoint之后,我们有三个关键的instance id。

注意:这里强调的是本节点,不同节点数据可能不同。

除此之外,还有一个配置:Node::SetHoldPaxosLogCount()控制需要保留多少在StateMachine::GetCheckpointInstanceID()之前的PaxosLog。保留一定数量的Paxos log的目的在于,如果其他节点数据不对齐,可以通过保留的这部分paxos log完成对齐,而不需要Checkpoint数据介入。

正常情况下,三者关系如下:


Checkpoint概念关系图

清理动作由Clearner线程的run动作触发,触发频率、清理数据量等通过配置参数指定。为了减少清理动作对系统产生的副作用,PhxPaxos将清理动作拆解到毫秒级别分批次运行。假设我们设定每秒清理2000条记录(Cleaner_DELETE_QPS=2000),那么执行间隔为iSleepMs=1ms,每次删除数据量为iDeleteInterval=2条。代码逻辑如下:

        //control delete speed to avoid affecting the io too much.
        int iDeleteQps = Cleaner_DELETE_QPS;
        int iSleepMs = iDeleteQps > 1000 ? 1 : 1000 / iDeleteQps;
        int iDeleteInterval = iDeleteQps > 1000 ? iDeleteQps / 1000 + 1 : 1;

根据此频率,触发清理动作。

        while (true)
        {
            if (m_bIsEnd)
            {
                PLGHead("Checkpoint.Cleaner [END]");
                return;
            }

            if (!m_bCanrun)
            {
                PLGImp("Pausing, sleep");
                m_bIsPaused = true;
                Time::MsSleep(1000);
                continue;
            }

            uint64_t llInstanceID = m_poCheckpointMgr->GetMinChosenInstanceID();
            uint64_t llCPInstanceID = m_poSMFac->GetCheckpointInstanceID(m_poConfig->GetMyGroupIdx()) + 1;
            uint64_t llMaxChosenInstanceID = m_poCheckpointMgr->GetMaxChosenInstanceID();

            int iDeleteCount = 0;
            //最小提案Instance ID + 保留条数 < Checkpoint Instance ID
            while ((llInstanceID + m_llHoldCount < llCPInstanceID)
                   && (llInstanceID + m_llHoldCount < llMaxChosenInstanceID))
            {
                //删除单条记录
                bool bDeleteRet = DeleteOne(llInstanceID);

                if (bDeleteRet)
                {
                    //PLGImp("delete one done, instanceid %lu", llInstanceID);
                    llInstanceID++;
                    iDeleteCount++;

                    if (iDeleteCount >= iDeleteInterval)
                    {
                        iDeleteCount = 0;
                        Time::MsSleep(iSleepMs);
                    }
                }
                else
                {
                    PLGDebug("delete system fail, instanceid %lu", llInstanceID);
                    break;
                }
            }

            if (llCPInstanceID == 0)
            {
                PLGStatus("sleep a while, max deleted instanceid %lu checkpoint instanceid (no checkpoint) now instanceid %lu",
                          llInstanceID, m_poCheckpointMgr->GetMaxChosenInstanceID());
            }
            else
            {
                PLGStatus("sleep a while, max deleted instanceid %lu checkpoint instanceid %lu now instanceid %lu",
                          llInstanceID, llCPInstanceID, m_poCheckpointMgr->GetMaxChosenInstanceID());
            }

            Time::MsSleep(OtherUtils::FastRand() % 500 + 500);
        }

触发清理paxos log的条件包括Checkpoint Instance ID和Min Chosen Instance ID之间的记录超过需要保留的条数,或者Max Chosen Instance ID和Min Chosen Instance ID之间记录超过保留条数。后面一个判定只是一种保险的附加逻辑,因为实际上Max Chosen Instance ID >= Checkpoint Instance ID永远成立。

    bool Cleaner :: DeleteOne(const uint64_t llInstanceID)
    {
        WriteOptions oWriteOptions;
        oWriteOptions.bSync = false;

        //删除日志数据
        int ret = m_poLogStorage->Del(oWriteOptions, m_poConfig->GetMyGroupIdx(), llInstanceID);
        if (ret != 0)
        {
            return false;
        }

        //数据已经删除,更新Min Chosen Instance ID
        m_poCheckpointMgr->SetMinChosenInstanceIDCache(llInstanceID);
        
        if (llInstanceID >= m_llLastSave + DELETE_SAVE_INTERVAL)
        {
            int ret = m_poCheckpointMgr->SetMinChosenInstanceID(llInstanceID + 1);

            if (ret != 0)
            {
                PLGErr("SetMinChosenInstanceID fail, now delete instanceid %lu", llInstanceID);
                return false;
            }

            m_llLastSave = llInstanceID;

            PLGImp("delete %d instance done, now minchosen instanceid %lu",
                   DELETE_SAVE_INTERVAL, llInstanceID + 1);
        }

        return true;
    }

删除数据包括两个动作:删除paxos log,更新本节点的Min Chosen Instance ID信息。但如果每删除一条

6.6 CheckpointMgr

CheckpointMgr真正管理的只有Cleaner、Replayer两个对象,负责管理这两个线程的初始化、启动、停止等典型动作。关于Cleaner和Replayer前面已经讲过了,此处不提。来看CheckpointMgr另外一组功能:

    class CheckpointMgr
    {
    public:
        //准备发起Checkpoint请求
        int PrepareForAskforCheckpoint(const nodeid_t iSendNodeID);
        //当前是否处于Checkpoint模式,退出Checkpoint模式。后面两个函数只是设置和查看标志位(m_
        const bool InAskforcheckpointMode() const;
        //退出Checkpoint模式
        void ExitCheckpointMode();
      ...
     }

后面两个函数只是设置和查看标志位(m_bInAskforCheckpointMode ),来看PrepareForAskforCheckpoint实现。

    int CheckpointMgr :: PrepareForAskforCheckpoint(const nodeid_t iSendNodeID)
    {
        if (m_setNeedAsk.find(iSendNodeID) == m_setNeedAsk.end())
        {
            m_setNeedAsk.insert(iSendNodeID);
        }

        if (m_llLastAskforCheckpointTime == 0)
        {
            m_llLastAskforCheckpointTime = Time::GetSteadyClockMS();
        }

        uint64_t llNowTime = Time::GetSteadyClockMS();
        //发起请求已经超过1分钟,强制进入Checkpoint Mode
        if (llNowTime > m_llLastAskforCheckpointTime + 60000)
        {
            PLGImp("no majority reply, just ask for checkpoint");
        }
        else
        {
            //未超过半数节点响应并同意Checkpoint请求
            if ((int)m_setNeedAsk.size() < m_poConfig->GetMajorityCount())
            {
                PLGImp("Need more other tell us need to askforcheckpoint");
                return -2;
            }
        }

        m_llLastAskforCheckpointTime = 0;
        m_bInAskforCheckpointMode = true;

        return 0;
    }

何时触发进入Checkpoint模式呢?当本节点向其他节点发起learn请求,其他节点保存的Min Chosen Instance ID比本地的Instance ID还要大。这说明本节点无法再向该节点通过learn完成追赶,这时调用PrepareForAskforCheckpoint请求进入Checkpoint模式。而真正进入Checkpoint模式需要满足两个条件之一:

一旦决定进入Checkpoint模式,向最好告知的节点发起Checkpoint请求。

    void Learner :: AskforCheckpoint(const nodeid_t iSendNodeID)
    {
        PLGHead("START");

        int ret = m_poCheckpointMgr->PrepareForAskforCheckpoint(iSendNodeID);

        if (ret != 0)
        {
            return;
        }

        PaxosMsg oPaxosMsg;

        oPaxosMsg.set_instanceid(GetInstanceID());
        oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
        oPaxosMsg.set_msgtype(MsgType_PaxosLearner_AskforCheckpoint);

        PLGHead("END InstanceID %lu MyNodeID %lu", GetInstanceID(), oPaxosMsg.nodeid());

        SendMessage(iSendNodeID, oPaxosMsg);
    }

该节点接收该消息后,启动CheckpointSender线程,发送Checkpoint数据。

6.7 CheckpointSender/CheckpointReceiver

CheckpointSender的定位很明确,一旦线程启动,立即执行Checkpoint数据发送。线程核心函数如下:

    void CheckpointSender :: run()
    {
        m_bIsStarted = true;
        m_llAbsLastAckTime = Time::GetSteadyClockMS();

        //pause checkpoint replayer
        bool bNeedContinue = false;
        //暂停replayer
        while (!m_poCheckpointMgr->GetReplayer()->IsPaused())
        {
            if (m_bIsEnd)
            {
                m_bIsEnded = true;
                return;
            }

            bNeedContinue = true;

            m_poCheckpointMgr->GetReplayer()->Pause();
            PLGDebug("wait replayer paused.");
            Time::MsSleep(100);
        }
        //锁定所有的状态机
        int ret = LockCheckpoint();

        if (ret == 0)
        {
            //发送Checkpoint数据
            SendCheckpoint();
            //解锁状态机
            UnLockCheckpoint();
        }

        //恢复replayer
        if (bNeedContinue)
        {
            m_poCheckpointMgr->GetReplayer()->Continue();
        }

        PLGHead("Checkpoint.Sender [END]");
        m_bIsEnded = true;
    }

SendCheckpoint内部又分为三个阶段,开始、数据传输、结束,通过flag标识。

6.7.1 Checkpoint开始

开始Checkpoint。Checkpoint Sender发送开始Checkpoint消息(CheckpointSendFileFlag_BEGIN)到Checkpoint Receiver。Receiver收到该消息后,做如下几件事:

  1. 删除本地的、当前Group的所有Checkpoint文件。
  2. 删除数据库中的全部paxos log。
  3. 重置Min Chosen Instance ID,以Checkpoint Sender发送过来的数据做为基线。

值得注意的是,整个过程中没有提及本机的Cleaner和Replayer,难道不应该把这些一并停掉?再有,先删除本地数据库中的paxos log再重置Min Chosen Instance ID,过程中可能又有新的数据写入了,并发问题怎么解决呢?

记得CheckpointMgr的中有一个InAskforcheckpointMode方法,判定当前是否处于Checkpoint模式,搜索其使用者,我们看到这样一段逻辑:

        if (iCmd == MsgCmd_PaxosMsg)
        {
            //注意这里
            if (m_oCheckpointMgr.InAskforcheckpointMode())
            {
                PLGImp("in ask for checkpoint mode, ignord paxosmsg");
                return;
            }

            PaxosMsg oPaxosMsg;
            bool bSucc = oPaxosMsg.ParseFromArray(sBuffer.data() + iBodyStartPos, iBodyLen);
            if (!bSucc)
            {
                BP->GetInstanceBP()->OnReceiveParseError();
                PLGErr("PaxosMsg.ParseFromArray fail, skip this msg");
                return;
            }

            if (!ReceiveMsgHeaderCheck(oHeader, oPaxosMsg.nodeid()))
            {
                return;
            }

            OnReceivePaxosMsg(oPaxosMsg);
        }

当本节点接收到来自其他节点的数据时,如果当前处于Checkpoint模式,非Checkpoint相关的数据包全部丢弃。因此,即便Cleaner和Replayer未停止,即便删除数据和重置之间存在时间间隔也不会有问题。因为整个Checkpoint过程中,并没有新的消息被处理,一切处于“停滞”状态。

6.7.2 Checkpoint数据传输

Checkpoint执行过程代码虽然很多,但是逻辑并不复杂,这里通过伪码加以说明:

for each state_machine in state_machine_list
        call state_machine.GetCheckpointState get checkpoint_file_list

        for each checkpoint_file in checkpoint_file_list
                open checkpoint_file
                while(true)
                        read 1M buffer
                        send buffer to CheckpointReceiver
                        break if read to end
                end
        end
end

Checkpoint数据传输、处理过程中数据包标识为CheckpointSendFileFlag_ING。遍历本节点的所有状态机对象,依次做每个状态机的Checkpoint对账。通过调用状态机的GetCheckpointState获取本状态机的全部Checkpoint文件列表,并依次发送。单次发送大小不超过1M。
CheckpointReceiver的伪码如下:

create file if not exist
open file
append file
end

对应的源码入口分别为CheckpointSender :: SendCheckpoint,CheckpointReceiver :: ReceiveCheckpoint。

6.7.3 Checkpoint结束

一旦所有的Checkpoint数据发送完成,需要发送CheckpointSendFileFlag_END消息到Checkpointer发起者。处理逻辑如下:

    int Learner :: OnSendCheckpoint_End(const CheckpointMsg& oCheckpointMsg)
    {
        if (!m_oCheckpointReceiver.IsReceiverFinish(oCheckpointMsg.nodeid(),
                oCheckpointMsg.uuid(), oCheckpointMsg.sequence()))
        {
            PLGErr("receive end msg but receiver not finish");
            return -1;
        }

        BP->GetCheckpointBP()->ReceiveCheckpointDone();

        std::vector<StateMachine*> vecSMList = m_poSMFac->GetSMList();
        //遍历所有的状态机
        for (auto & poSM : vecSMList)
        {
            if (poSM->SMID() == SYSTEM_V_SMID
                || poSM->SMID() == MASTER_V_SMID)
            {
                //system variables sm no checkpoint
                //master variables sm no checkpoint
                continue;
            }
            //获取Checkpoint路径和文件信息
            string sTmpDirPath = m_oCheckpointReceiver.GetTmpDirPath(poSM->SMID());
            std::vector<std::string> vecFilePathList;

            int ret = FileUtils :: IterDir(sTmpDirPath, vecFilePathList);

            if (ret != 0)
            {
                PLGErr("IterDir fail, dirpath %s", sTmpDirPath.c_str());
            }

            if (vecFilePathList.size() == 0)
            {
                PLGImp("this sm %d have no checkpoint", poSM->SMID());
                continue;
            }
            //通知状态机Checkpoint文件变更
            ret = poSM->LoadCheckpointState(
                      m_poConfig->GetMyGroupIdx(),
                      sTmpDirPath,
                      vecFilePathList,
                      oCheckpointMsg.checkpointinstanceid());

            if (ret != 0)
            {
                BP->GetCheckpointBP()->ReceiveCheckpointAndLoadFail();
                return ret;
            }

        }

        BP->GetCheckpointBP()->ReceiveCheckpointAndLoadSucc();
        PLGImp("All sm load state ok, start to exit process");
         //退出PhxPaxos
        exit(-1);

        return 0;
    }

引用《状态机Checkpoint详解》中的文字:

StateMachine::LoadCheckpointState()当一个节点获得来自其他节点的Checkpoint数据时,会调用这个函数,将这份数据交由开发者进行处理(开发者往往要做的事情就是将这份Checkpoint数据覆盖当前节点的数据),当调用此函数完成后,PhxPaxos将会进行进程自杀操作,通过重启来完成一个新Checkpoint数据的启动。

也就是说,整个Checkpoint的对账过程,业务侧并没有感知,PhxPaxos内部只是将这些文件存放到了一个临时路径,只在最后一刻才通过LoadCheckpointState通知业务处理。

6.8 总结

Checkpoint机制在每个节点默认启动两个线程:Cleaner、Replayer。Cleaner负责定期清理过时的paxos log,Replayer则负责追赶状态机的Execute数据到ExecuteForCheckpoint。另外,如果某个节点落后太多,通过learner无法完成追赶时,将触发启动Checkpoint的另外一个线程Checkpoint Sender。Checkpoint Sender负责发送本节点的Checkpoint文件到Checkpoint Receiver。一旦数据发送完成,Checkpoint Receiver将进行进程自杀操作,通过重启完成Checkpoint数据同步。

Checkpoint机制并不是PhxPaxos算法的一部分,而是真正工程化的产物。它尝试解决的工程问题包括:

OK,至此了解了PhxPaxos的基础网络部件、paxos角色以及工程化的Checkpoint机制,终于可以浮到水面窥视PhxPaxos的全貌了。


【转载请注明】随安居士. 6. PhxPaxos源码分析之归档机制. 2017.11.17

上一篇下一篇

猜你喜欢

热点阅读