6. PhxPaxos源码分析之归档机制
目录
1. PhxPaxos源码分析之关于PhxPaxos
2. PhxPaxos分析之网络基础部件
3. PhxPaxos源码分析之Proposer、Acceptor
4. PhxPaxos源码分析之Learner
5. PhxPaxos源码分析之状态机
6. PhxPaxos源码分析之归档机制
7. PhxPaxos源码分析之整体架构
6.1 基本概念
上一节我们讲解了PhxPaxos的状态机,各种场景下可以定制不同的状态机,每个状态机独立消费一个paxos log以驱动业务状态变更。
状态机消费完paxos log之后,paxos log是否可以删除呢?答案是不行。有如下几个原因:
- 某些paxos节点可能由于网络等原因落后于其他节点,需要学习现有的paxos log。
- 业务消费完paxos log之后,可能由于重启等原因出现数据丢失,需要通过paxos log做重演(replay)。
那什么时候可以删除paxos log呢?答案是不知道。因为某个节点可能永远处于离线状态,这时候必须保留从最初到现在所有的paxos log。但另一方面,如果数据不删除将无限增长,这是无法做到的。
PhxPaxos因此引入了Checkpoint机制,关于该机制的详细描述请参见《状态机Checkpoint详解》。这里简要说明如下:
- 一个Checkpoint代表着一份某一时刻被固化下来的状态机数据,它通过sm.h下的StateMachine::GetCheckpointInstanceID()函数反馈它的精确时刻。
- 每次启动replay时,只需要从GetCheckpointInstanceID所指向PaxosLog位置开始而不是从0开始。
- Node::SetHoldPaxosLogCount()控制需要保留多少在StateMachine::GetCheckpointInstanceID()之前的PaxosLog。
- 保留一定数量的Paxos log的目的在于,如果其他节点数据不对齐,可以通过保留的这部分paxos log完成对齐,而不需要Checkpoint数据介入。
- 如果对齐数据已经被删除,这时需要Checkpoint数据传输。
6.2 代码设计
Checkpoint机制相关类图如下:
Checkpoint机制
参照上一节讲的功能及类图补充说明如下:
-
Replayer
replay线程。当业务状态机已经消费指定paxos log后,交由Checkpoint重新执行。由Checkpoint replay的数据允许被删除。 -
Cleaner
paxos log清理线程。根据配置的保留条数等清理paxos log。 -
CheckpointSender
归档数据发送线程。将归档数据发往其他节点,用于归档数据同步。 -
CheckpointReceiver
归档数据接收器。接收其他节点发送过来的归档数据。 -
CheckpointMgr
Checkpoint机制管理器。统一管理整个归档机制。
6.3 状态机(State Machine)
再来看StateMachine接口,这次我们重点关注Checkpoint相关接口。
class StateMachine
{
public:
virtual const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const;
virtual int LockCheckpointState();
virtual int GetCheckpointState(const int iGroupIdx, std::string& sDirPath,
std::vector<std::string>& vecFileList);
virtual void UnLockCheckpointState();
virtual int LoadCheckpointState(const int iGroupIdx, const std::string& sCheckpointTmpFileDirPath,
const std::vector<std::string>& vecFileList, const uint64_t llCheckpointInstanceID);
virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string& sPaxosValue);
};
接口说明如下:
- GetCheckpointInstanceID(const int iGroupIdx)
Checkpoint所指向的最大的Instance ID,在此之前的paxos log数据状态机已经不需要了。- StateMachine::LockCheckpointState()
用于开发者锁定状态机Checkpoint数据,这个锁定的意思是指这份数据文件不能被修改,移动和删除,因为接下来PhxPaxos就要将这些文件发送给其他节点,而如果这个过程中出现了修改,则发送的数据可能乱掉。- StateMachine::GetCheckpointState()
Phxpaxos获取Checkpoint的文件列表,从而将文件发送给其他节点。- StateMachine::UnLockCheckpointState()
当PhxPaxos发送Checkpoint到其他节点完成后,会调用的解锁函数,解除对开发者的状态机Checkpoint数据的锁定。- StateMachine::LoadCheckpointState()
当一个节点获得来自其他节点的Checkpoint数据时,会调用这个函数,将这份数据交由开发者进行处理(开发者往往要做的事情就是将这份Checkpoint数据覆盖当前节点的数据),当调用此函数完成后,PhxPaxos将会进行进程自杀操作,通过重启来完成一个新Checkpoint数据的启动。- ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, const std::string& sPaxosValue)
paxos log归档的replay接口。
6.4 Replayer
Replayer是一个独立的线程,负责将选中的提案值做Checkpoint操作。实现逻辑非常简单,读取本机的Checkpoint Instance ID信息,定时和Max Chosen Instance ID比较,如果Checkpoint落后于Max Chosen Instance ID,通过调用状态机的ExecuteForCheckpoint进行重演。
void Replayer :: run()
{
PLGHead("Checkpoint.Replayer [START]");
uint64_t llInstanceID = m_poSMFac->GetCheckpointInstanceID(m_poConfig->GetMyGroupIdx()) + 1;
while (true)
{
if (m_bIsEnd)
{
PLGHead("Checkpoint.Replayer [END]");
return;
}
if (!m_bCanrun)
{
//PLGImp("Pausing, sleep");
m_bIsPaused = true;
Time::MsSleep(1000);
continue;
}
//本节点的所有提案值已全部重演
if (llInstanceID >= m_poCheckpointMgr->GetMaxChosenInstanceID())
{
//PLGImp("now maxchosen instanceid %lu small than excute instanceid %lu, wait",
//m_poCheckpointMgr->GetMaxChosenInstanceID(), llInstanceID);
Time::MsSleep(1000);
continue;
}
//重演
bool bPlayRet = PlayOne(llInstanceID);
if (bPlayRet)
{
PLGImp("Play one done, instanceid %lu", llInstanceID);
llInstanceID++;
}
else
{
PLGErr("Play one fail, instanceid %lu", llInstanceID);
Time::MsSleep(500);
}
}
}
PlayOne实现逻辑如下:
bool Replayer :: PlayOne(const uint64_t llInstanceID)
{
//读取数据库中的paxos log
AcceptorStateData oState;
int ret = m_oPaxosLog.ReadState(m_poConfig->GetMyGroupIdx(), llInstanceID, oState);
if (ret != 0)
{
return false;
}
//调用状态机的ExecuteForCheckpoint
bool bExecuteRet = m_poSMFac->ExecuteForCheckpoint(
m_poConfig->GetMyGroupIdx(), llInstanceID, oState.acceptedvalue());
if (!bExecuteRet)
{
PLGErr("Checkpoint sm excute fail, instanceid %lu", llInstanceID);
}
return bExecuteRet;
}
6.5 Paxos log清理(Clearner)
在PhxPaxos中,每个Group启动一个Clearner线程清理本Group的paxos log。在Group中,instance id是不断递增的,每个instance id对应一个paxos log,当加入Checkpoint之后,我们有三个关键的instance id。
-
Min Chosen Instnace ID
本节点,选定提案的最小Instance ID,即最老的paxos log所对应的Instance ID。 -
Checkpoint Instance ID
本节点,Checkpoint确定的最大Instance ID,即在这之前的paxos log数据Checkpoint都已经不需要了。 -
Max Chosen Instance ID
本节点,选定提案的最大Instnace ID,即最新的paxos log所对应的Instnace ID。
注意:这里强调的是本节点,不同节点数据可能不同。
除此之外,还有一个配置:Node::SetHoldPaxosLogCount()控制需要保留多少在StateMachine::GetCheckpointInstanceID()之前的PaxosLog。保留一定数量的Paxos log的目的在于,如果其他节点数据不对齐,可以通过保留的这部分paxos log完成对齐,而不需要Checkpoint数据介入。
正常情况下,三者关系如下:
Checkpoint概念关系图
清理动作由Clearner线程的run动作触发,触发频率、清理数据量等通过配置参数指定。为了减少清理动作对系统产生的副作用,PhxPaxos将清理动作拆解到毫秒级别分批次运行。假设我们设定每秒清理2000条记录(Cleaner_DELETE_QPS=2000),那么执行间隔为iSleepMs=1ms,每次删除数据量为iDeleteInterval=2条。代码逻辑如下:
//control delete speed to avoid affecting the io too much.
int iDeleteQps = Cleaner_DELETE_QPS;
int iSleepMs = iDeleteQps > 1000 ? 1 : 1000 / iDeleteQps;
int iDeleteInterval = iDeleteQps > 1000 ? iDeleteQps / 1000 + 1 : 1;
根据此频率,触发清理动作。
while (true)
{
if (m_bIsEnd)
{
PLGHead("Checkpoint.Cleaner [END]");
return;
}
if (!m_bCanrun)
{
PLGImp("Pausing, sleep");
m_bIsPaused = true;
Time::MsSleep(1000);
continue;
}
uint64_t llInstanceID = m_poCheckpointMgr->GetMinChosenInstanceID();
uint64_t llCPInstanceID = m_poSMFac->GetCheckpointInstanceID(m_poConfig->GetMyGroupIdx()) + 1;
uint64_t llMaxChosenInstanceID = m_poCheckpointMgr->GetMaxChosenInstanceID();
int iDeleteCount = 0;
//最小提案Instance ID + 保留条数 < Checkpoint Instance ID
while ((llInstanceID + m_llHoldCount < llCPInstanceID)
&& (llInstanceID + m_llHoldCount < llMaxChosenInstanceID))
{
//删除单条记录
bool bDeleteRet = DeleteOne(llInstanceID);
if (bDeleteRet)
{
//PLGImp("delete one done, instanceid %lu", llInstanceID);
llInstanceID++;
iDeleteCount++;
if (iDeleteCount >= iDeleteInterval)
{
iDeleteCount = 0;
Time::MsSleep(iSleepMs);
}
}
else
{
PLGDebug("delete system fail, instanceid %lu", llInstanceID);
break;
}
}
if (llCPInstanceID == 0)
{
PLGStatus("sleep a while, max deleted instanceid %lu checkpoint instanceid (no checkpoint) now instanceid %lu",
llInstanceID, m_poCheckpointMgr->GetMaxChosenInstanceID());
}
else
{
PLGStatus("sleep a while, max deleted instanceid %lu checkpoint instanceid %lu now instanceid %lu",
llInstanceID, llCPInstanceID, m_poCheckpointMgr->GetMaxChosenInstanceID());
}
Time::MsSleep(OtherUtils::FastRand() % 500 + 500);
}
触发清理paxos log的条件包括Checkpoint Instance ID和Min Chosen Instance ID之间的记录超过需要保留的条数,或者Max Chosen Instance ID和Min Chosen Instance ID之间记录超过保留条数。后面一个判定只是一种保险的附加逻辑,因为实际上Max Chosen Instance ID >= Checkpoint Instance ID永远成立。
bool Cleaner :: DeleteOne(const uint64_t llInstanceID)
{
WriteOptions oWriteOptions;
oWriteOptions.bSync = false;
//删除日志数据
int ret = m_poLogStorage->Del(oWriteOptions, m_poConfig->GetMyGroupIdx(), llInstanceID);
if (ret != 0)
{
return false;
}
//数据已经删除,更新Min Chosen Instance ID
m_poCheckpointMgr->SetMinChosenInstanceIDCache(llInstanceID);
if (llInstanceID >= m_llLastSave + DELETE_SAVE_INTERVAL)
{
int ret = m_poCheckpointMgr->SetMinChosenInstanceID(llInstanceID + 1);
if (ret != 0)
{
PLGErr("SetMinChosenInstanceID fail, now delete instanceid %lu", llInstanceID);
return false;
}
m_llLastSave = llInstanceID;
PLGImp("delete %d instance done, now minchosen instanceid %lu",
DELETE_SAVE_INTERVAL, llInstanceID + 1);
}
return true;
}
删除数据包括两个动作:删除paxos log,更新本节点的Min Chosen Instance ID信息。但如果每删除一条
6.6 CheckpointMgr
CheckpointMgr真正管理的只有Cleaner、Replayer两个对象,负责管理这两个线程的初始化、启动、停止等典型动作。关于Cleaner和Replayer前面已经讲过了,此处不提。来看CheckpointMgr另外一组功能:
class CheckpointMgr
{
public:
//准备发起Checkpoint请求
int PrepareForAskforCheckpoint(const nodeid_t iSendNodeID);
//当前是否处于Checkpoint模式,退出Checkpoint模式。后面两个函数只是设置和查看标志位(m_
const bool InAskforcheckpointMode() const;
//退出Checkpoint模式
void ExitCheckpointMode();
...
}
后面两个函数只是设置和查看标志位(m_bInAskforCheckpointMode ),来看PrepareForAskforCheckpoint实现。
int CheckpointMgr :: PrepareForAskforCheckpoint(const nodeid_t iSendNodeID)
{
if (m_setNeedAsk.find(iSendNodeID) == m_setNeedAsk.end())
{
m_setNeedAsk.insert(iSendNodeID);
}
if (m_llLastAskforCheckpointTime == 0)
{
m_llLastAskforCheckpointTime = Time::GetSteadyClockMS();
}
uint64_t llNowTime = Time::GetSteadyClockMS();
//发起请求已经超过1分钟,强制进入Checkpoint Mode
if (llNowTime > m_llLastAskforCheckpointTime + 60000)
{
PLGImp("no majority reply, just ask for checkpoint");
}
else
{
//未超过半数节点响应并同意Checkpoint请求
if ((int)m_setNeedAsk.size() < m_poConfig->GetMajorityCount())
{
PLGImp("Need more other tell us need to askforcheckpoint");
return -2;
}
}
m_llLastAskforCheckpointTime = 0;
m_bInAskforCheckpointMode = true;
return 0;
}
何时触发进入Checkpoint模式呢?当本节点向其他节点发起learn请求,其他节点保存的Min Chosen Instance ID比本地的Instance ID还要大。这说明本节点无法再向该节点通过learn完成追赶,这时调用PrepareForAskforCheckpoint请求进入Checkpoint模式。而真正进入Checkpoint模式需要满足两个条件之一:
- 已有超过半数节点无法通过Learn方式完成追赶,进入Checkpoint模式。
- 距离首个告知无法支持Learn追赶的节点距今已超过一分钟,仍有其他节点告知追赶无法完成,未避免差距进一步拉大,进入Checkpoint模式。
一旦决定进入Checkpoint模式,向最好告知的节点发起Checkpoint请求。
void Learner :: AskforCheckpoint(const nodeid_t iSendNodeID)
{
PLGHead("START");
int ret = m_poCheckpointMgr->PrepareForAskforCheckpoint(iSendNodeID);
if (ret != 0)
{
return;
}
PaxosMsg oPaxosMsg;
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_msgtype(MsgType_PaxosLearner_AskforCheckpoint);
PLGHead("END InstanceID %lu MyNodeID %lu", GetInstanceID(), oPaxosMsg.nodeid());
SendMessage(iSendNodeID, oPaxosMsg);
}
该节点接收该消息后,启动CheckpointSender线程,发送Checkpoint数据。
6.7 CheckpointSender/CheckpointReceiver
CheckpointSender的定位很明确,一旦线程启动,立即执行Checkpoint数据发送。线程核心函数如下:
void CheckpointSender :: run()
{
m_bIsStarted = true;
m_llAbsLastAckTime = Time::GetSteadyClockMS();
//pause checkpoint replayer
bool bNeedContinue = false;
//暂停replayer
while (!m_poCheckpointMgr->GetReplayer()->IsPaused())
{
if (m_bIsEnd)
{
m_bIsEnded = true;
return;
}
bNeedContinue = true;
m_poCheckpointMgr->GetReplayer()->Pause();
PLGDebug("wait replayer paused.");
Time::MsSleep(100);
}
//锁定所有的状态机
int ret = LockCheckpoint();
if (ret == 0)
{
//发送Checkpoint数据
SendCheckpoint();
//解锁状态机
UnLockCheckpoint();
}
//恢复replayer
if (bNeedContinue)
{
m_poCheckpointMgr->GetReplayer()->Continue();
}
PLGHead("Checkpoint.Sender [END]");
m_bIsEnded = true;
}
- Replayer负责本地追赶learner数据,当需要发起Checkpoint操作时,要保证Checkpoint文件不变,因此PhxPaxos机制需要首先暂停Replayer。
- Checkpoint之前,调用所有状态机的LockCheckpointState锁定Checkpoint文件。
- Checkpoint之后,调用所有状态机的UnLockCheckpointState解除锁定。
- 恢复Replayer,追赶落后的learner数据。
SendCheckpoint内部又分为三个阶段,开始、数据传输、结束,通过flag标识。
6.7.1 Checkpoint开始
开始Checkpoint。Checkpoint Sender发送开始Checkpoint消息(CheckpointSendFileFlag_BEGIN)到Checkpoint Receiver。Receiver收到该消息后,做如下几件事:
- 删除本地的、当前Group的所有Checkpoint文件。
- 删除数据库中的全部paxos log。
- 重置Min Chosen Instance ID,以Checkpoint Sender发送过来的数据做为基线。
值得注意的是,整个过程中没有提及本机的Cleaner和Replayer,难道不应该把这些一并停掉?再有,先删除本地数据库中的paxos log再重置Min Chosen Instance ID,过程中可能又有新的数据写入了,并发问题怎么解决呢?
记得CheckpointMgr的中有一个InAskforcheckpointMode方法,判定当前是否处于Checkpoint模式,搜索其使用者,我们看到这样一段逻辑:
if (iCmd == MsgCmd_PaxosMsg)
{
//注意这里
if (m_oCheckpointMgr.InAskforcheckpointMode())
{
PLGImp("in ask for checkpoint mode, ignord paxosmsg");
return;
}
PaxosMsg oPaxosMsg;
bool bSucc = oPaxosMsg.ParseFromArray(sBuffer.data() + iBodyStartPos, iBodyLen);
if (!bSucc)
{
BP->GetInstanceBP()->OnReceiveParseError();
PLGErr("PaxosMsg.ParseFromArray fail, skip this msg");
return;
}
if (!ReceiveMsgHeaderCheck(oHeader, oPaxosMsg.nodeid()))
{
return;
}
OnReceivePaxosMsg(oPaxosMsg);
}
当本节点接收到来自其他节点的数据时,如果当前处于Checkpoint模式,非Checkpoint相关的数据包全部丢弃。因此,即便Cleaner和Replayer未停止,即便删除数据和重置之间存在时间间隔也不会有问题。因为整个Checkpoint过程中,并没有新的消息被处理,一切处于“停滞”状态。
6.7.2 Checkpoint数据传输
Checkpoint执行过程代码虽然很多,但是逻辑并不复杂,这里通过伪码加以说明:
for each state_machine in state_machine_list
call state_machine.GetCheckpointState get checkpoint_file_list
for each checkpoint_file in checkpoint_file_list
open checkpoint_file
while(true)
read 1M buffer
send buffer to CheckpointReceiver
break if read to end
end
end
end
Checkpoint数据传输、处理过程中数据包标识为CheckpointSendFileFlag_ING。遍历本节点的所有状态机对象,依次做每个状态机的Checkpoint对账。通过调用状态机的GetCheckpointState获取本状态机的全部Checkpoint文件列表,并依次发送。单次发送大小不超过1M。
CheckpointReceiver的伪码如下:
create file if not exist
open file
append file
end
对应的源码入口分别为CheckpointSender :: SendCheckpoint,CheckpointReceiver :: ReceiveCheckpoint。
6.7.3 Checkpoint结束
一旦所有的Checkpoint数据发送完成,需要发送CheckpointSendFileFlag_END消息到Checkpointer发起者。处理逻辑如下:
int Learner :: OnSendCheckpoint_End(const CheckpointMsg& oCheckpointMsg)
{
if (!m_oCheckpointReceiver.IsReceiverFinish(oCheckpointMsg.nodeid(),
oCheckpointMsg.uuid(), oCheckpointMsg.sequence()))
{
PLGErr("receive end msg but receiver not finish");
return -1;
}
BP->GetCheckpointBP()->ReceiveCheckpointDone();
std::vector<StateMachine*> vecSMList = m_poSMFac->GetSMList();
//遍历所有的状态机
for (auto & poSM : vecSMList)
{
if (poSM->SMID() == SYSTEM_V_SMID
|| poSM->SMID() == MASTER_V_SMID)
{
//system variables sm no checkpoint
//master variables sm no checkpoint
continue;
}
//获取Checkpoint路径和文件信息
string sTmpDirPath = m_oCheckpointReceiver.GetTmpDirPath(poSM->SMID());
std::vector<std::string> vecFilePathList;
int ret = FileUtils :: IterDir(sTmpDirPath, vecFilePathList);
if (ret != 0)
{
PLGErr("IterDir fail, dirpath %s", sTmpDirPath.c_str());
}
if (vecFilePathList.size() == 0)
{
PLGImp("this sm %d have no checkpoint", poSM->SMID());
continue;
}
//通知状态机Checkpoint文件变更
ret = poSM->LoadCheckpointState(
m_poConfig->GetMyGroupIdx(),
sTmpDirPath,
vecFilePathList,
oCheckpointMsg.checkpointinstanceid());
if (ret != 0)
{
BP->GetCheckpointBP()->ReceiveCheckpointAndLoadFail();
return ret;
}
}
BP->GetCheckpointBP()->ReceiveCheckpointAndLoadSucc();
PLGImp("All sm load state ok, start to exit process");
//退出PhxPaxos
exit(-1);
return 0;
}
引用《状态机Checkpoint详解》中的文字:
StateMachine::LoadCheckpointState()当一个节点获得来自其他节点的Checkpoint数据时,会调用这个函数,将这份数据交由开发者进行处理(开发者往往要做的事情就是将这份Checkpoint数据覆盖当前节点的数据),当调用此函数完成后,PhxPaxos将会进行进程自杀操作,通过重启来完成一个新Checkpoint数据的启动。
也就是说,整个Checkpoint的对账过程,业务侧并没有感知,PhxPaxos内部只是将这些文件存放到了一个临时路径,只在最后一刻才通过LoadCheckpointState通知业务处理。
6.8 总结
Checkpoint机制在每个节点默认启动两个线程:Cleaner、Replayer。Cleaner负责定期清理过时的paxos log,Replayer则负责追赶状态机的Execute数据到ExecuteForCheckpoint。另外,如果某个节点落后太多,通过learner无法完成追赶时,将触发启动Checkpoint的另外一个线程Checkpoint Sender。Checkpoint Sender负责发送本节点的Checkpoint文件到Checkpoint Receiver。一旦数据发送完成,Checkpoint Receiver将进行进程自杀操作,通过重启完成Checkpoint数据同步。
Checkpoint机制并不是PhxPaxos算法的一部分,而是真正工程化的产物。它尝试解决的工程问题包括:
- 某个节点落后太多场景下,如何实现快速追赶。
- 系统长时间运行场景下,如何避免海量日志。
- 系统异常等故障场景下,如何保证系统的可靠性。
OK,至此了解了PhxPaxos的基础网络部件、paxos角色以及工程化的Checkpoint机制,终于可以浮到水面窥视PhxPaxos的全貌了。
【转载请注明】随安居士. 6. PhxPaxos源码分析之归档机制. 2017.11.17