大数据Paxos分布式技术

7. PhxPaxos源码分析之整体架构

2017-11-18  本文已影响147人  随安居士

目录
1. PhxPaxos源码分析之关于PhxPaxos
2. PhxPaxos分析之网络基础部件
3. PhxPaxos源码分析之Proposer、Acceptor
4. PhxPaxos源码分析之Learner
5. PhxPaxos源码分析之状态机
6. PhxPaxos源码分析之归档机制
7. PhxPaxos源码分析之整体架构


7.1 Instance

前面几章我们依次介绍了网络基础部件、paxos三个核心角色(Proposer\Acceptor\Learner)、状态机、Checkpoint机制。在PhxPaxos中,Instance对象将上述角色组合到一起。多节点上Instance互相通信共同构成了一个paxos集群,有序的确定多个值。架构关系图如下:


Instance

Instance的实现类大约有900行,主要做了一些和网络数据处理相关的逻辑,这部分并不复杂。这里我想谈的是Instance初始化逻辑。

int Instance :: Init()
{
    //Must init acceptor first, because the max instanceid is record in acceptor state.
    int ret = m_oAcceptor.Init();
    if (ret != 0)
    {
        PLGErr("Acceptor.Init fail, ret %d", ret);
        return ret;
    }

    ret = m_oCheckpointMgr.Init();
    if (ret != 0)
    {
        PLGErr("CheckpointMgr.Init fail, ret %d", ret);
        return ret;
    }

    uint64_t llCPInstanceID = m_oCheckpointMgr.GetCheckpointInstanceID() + 1;

    PLGImp("Acceptor.OK, Log.InstanceID %lu Checkpoint.InstanceID %lu", 
            m_oAcceptor.GetInstanceID(), llCPInstanceID);

    uint64_t llNowInstanceID = llCPInstanceID;
    if (llNowInstanceID < m_oAcceptor.GetInstanceID())
    {
        ret = PlayLog(llNowInstanceID, m_oAcceptor.GetInstanceID());
        if (ret != 0)
        {
            return ret;
        }

        PLGImp("PlayLog OK, begin instanceid %lu end instanceid %lu", llNowInstanceID, m_oAcceptor.GetInstanceID());

        llNowInstanceID = m_oAcceptor.GetInstanceID();
    }
    else
    {
        if (llNowInstanceID > m_oAcceptor.GetInstanceID())
        {
            ret = ProtectionLogic_IsCheckpointInstanceIDCorrect(llNowInstanceID, m_oAcceptor.GetInstanceID());
            if (ret != 0)
            {
                return ret;
            }
            m_oAcceptor.InitForNewPaxosInstance();
        }
        
        m_oAcceptor.SetInstanceID(llNowInstanceID);
    }

    PLGImp("NowInstanceID %lu", llNowInstanceID);

    m_oLearner.SetInstanceID(llNowInstanceID);
    m_oProposer.SetInstanceID(llNowInstanceID);
    m_oProposer.SetStartProposalID(m_oAcceptor.GetAcceptorState()->GetPromiseBallot().m_llProposalID + 1);

    m_oCheckpointMgr.SetMaxChosenInstanceID(llNowInstanceID);

    ret = InitLastCheckSum();
    if (ret != 0)
    {
        return ret;
    }

    m_oLearner.Reset_AskforLearn_Noop();

    PLGImp("OK");

    return 0;
}

Instance的初始化在进程启动时,进程有可能时首次启动,也可能是异常退出后的重启。重启要将Instance恢复到启动前的状态,需要把磁盘中的数据恢复到内存中。对于PhxPaxos来说,磁盘中的数据主要集中在Acceptor、CheckpointMgr中。这里先读取Acceptor中的数据,随后读取CheckpointMgr中的数据,并将Acceptor中未体现在CheckpointMgr中的数据进行重演。最终,将paxos的instance恢复到启动前的状态:

    m_oLearner.SetInstanceID(llNowInstanceID);
    m_oProposer.SetInstanceID(llNowInstanceID);
    m_oProposer.SetStartProposalID(m_oAcceptor.GetAcceptorState()->GetPromiseBallot().m_llProposalID + 1);

随后将当前的instance id标记为Max Chosen Instance ID。

    m_oCheckpointMgr.SetMaxChosenInstanceID(llNowInstanceID);

严格讲llNowInstanceID不一定是Max Chosen Instance ID。因为Acceptor落盘的最后一条记录可能是尚未选中(Chosen)的“半成品”,即提案尚未完成。所以,这里将llNowInstanceID做为Max Chosen Instance ID是不准确的。只不过当前Max Chosen Instance ID仅在确定需要保留的paxos log数量时使用,而且最小的paxos log保留数量为300,因此并没有出什么问题。

7.2 Group

Instance之上是Group,每个Group包含一个Instance,Group负责Instance初始化:

    void Group :: Init()
    {
        m_iInitRet = m_oConfig.Init();

        if (m_iInitRet != 0)
        {
            return;
        }

        //inside sm
        AddStateMachine(m_oConfig.GetSystemVSM());
        AddStateMachine(m_oConfig.GetMasterSM());

        m_iInitRet = m_oInstance.Init();
    }

Group的价值在于概念抽象,不同节点的Instance的运行状态可能不同,即Instance ID不同,但这些Instnace仍旧属于一个Group。另外,PhxPaxos允许同时存在多个Group,每个Group下运行多个Instance(和节点个数相同)。如果不抽象Group概念,本Group内的Instance和Group间的Instance容易产生混淆。

Group
注:PhxPaxos将MasterMgr置于Group下节提到的Node中,但笔者认为放到Group中更便于理解。

7.3 Node

既然允许运行多个Group,我们需要一个Group Manager,在PhxPaxos中这个角色是Node抽象类。Node是整个PhxPaxos库对外提供的服务类,应用通过Node :: RunNode启动本节点的PhxPaxos服务。

    int Node :: RunNode(const Options& oOptions, Node*& poNode)
    {
        if (oOptions.bIsLargeValueMode)
        {
            InsideOptions::Instance()->SetAsLargeBufferMode();
        }

        InsideOptions::Instance()->SetGroupCount(oOptions.iGroupCount);

        poNode = nullptr;
        NetWork* poNetWork = nullptr;

        Breakpoint::m_poBreakpoint = nullptr;
        BP->SetInstance(oOptions.poBreakpoint);
        //节点初始化
        PNode* poRealNode = new PNode();
        int ret = poRealNode->Init(oOptions, poNetWork);

        if (ret != 0)
        {
            delete poRealNode;
            return ret;
        }

        //step1 set node to network
        //very important, let network on recieve callback can work.
        poNetWork->m_poNode = poRealNode;

        //step2 run network.
        //start recieve message from network, so all must init before this step.
        //must be the last step.
        //网络初始化
        poNetWork->RunNetWork();

        poNode = poRealNode;

        return 0;
    }

至此,PhxPaxos的整体架构已经浮现:


PhxPaxos整体架构

7.4 Follower

PhxPaxos中的节点除了做为Paxos协议的参与者,还运行另外一类成为follower的节点。Follower指定一个运行Paxos协议的节点用于数据同步,它节点不参与Paxos协议,也不参与Paxos选主。Follower更像传统意义上的同步备,当Paxos协议节点确定一个值后,将数据同步到Follower节点。但有一点不同的是:Follower节点运行Learner,当某个值缺失时,可以通过Learner主动发起AskForLearn习得。


屏幕快照 2017-11-13 下午5.00.24.png

7.5 总结

到这里,PhxPaxos源码分析就结束了。

整体上看,微信开源的PhxPaxos无论从架构设计、编码质量、细节优化上绝对可以配得上“生产级”这个称号(编码质量尤其值得称赞)。

更多PhxPaxos相关文章请参见官方WIKI


【转载请注明】随安居士. 7. PhxPaxos源码分析之整体架构. 2017.11.18

上一篇下一篇

猜你喜欢

热点阅读