互联网架构程序员

Raft算法分析以及在软负载中的实现研究

2017-11-15  本文已影响99人  zqrferrari

分布式系统下数据的一致性和系统可用性一直是个难题,工程上也有多种解决方案,如:mysql/RocketMQ的主备复制方案,本文将借助软负载服务的研究对Raft分布式一致性协议做分析和讨论。

本文主要分为以下几个部分:

1、分布式下数据一致性问题解决方案


根据CAP理论,分布式系统中,一致性、可用性和分区容错性三者无法同时满足,并且最多只能满足其中两个。

对于分布式系统的高可用方案,业界有一些通用的解决方案:


分布式高可用方案对比

其中横轴代表了分布式系统中通用的高可用解决方案,如:冷备、Master/Slave、Master/Master、两阶段提交以及基于Paxos算法的解决方案;纵轴代表了分布式系统所关心的各项指标,包括数据一致性、事务支持的程度、数据延迟、系统吞吐量、数据丢失可能性、故障自动恢复时间。

Master/Slave解决方案的典型实现有mysql、RocketMQ。

Paxos解决方案的实现有阿里的OceanBase。

Raft协议产生的背景在于解决分布式系统下数据的一致性问题,本文将主要集中在介绍Raft算法实现的研究上。

2、ConfigServer/VipServer/Diamond


ConfigServer(服务发现)、VipServer(软负载)、Diamond(配置管理)三个产品容易引起疑惑,因为三者有一定的相似性,功能上都有配置管理的能力,设计上都基于配置的pub/sub模式,也都需要解决数据的一致性和应用的稳定性问题,因此难免会让人产生困惑,为什么需要三套不同的产品来实现近似一样的功能,对此我们先明确下三者在应用结构中所处的位置,如下图:

ConfigServer/Diamond/VipServer的关系

一次请求的过程如下:

  1. 用户通过域名www.taobao.com访问,经过DNS解析获得域名地址,请求到达淘宝服务器机房
  2. http请求进过lvs进行一次负载均衡,到达ngnix服务器
  3. ngnix服务器通过内置的vipService客户端,按照一定算法选择一个ip
  4. 根据ip被ngnix转发到实际处理的机器,对应的应用服务开始处理请求
  5. 如果需要进行rpc调用,则用户configServer获取对应服务的机器ip,发起rpc调用
  6. 请求处理过程中,如果需要获取配置则通过diamond来获取

由此可见:

在上图的架构中VipServer集群对于数据的持久化和一致性的保障是基于Diamond的,而Diamond则是基于mysql做数据的持久化。过重的依赖无疑会极大的限制VipServer在负载均衡上的能力,阿里巴巴负载均衡技术之LVS和VIPServer的发展介绍中,未来VipServer将下沉作为基础的负载均衡设施用于替换lvs,上图中3替换掉1,所以去依赖是VipServer必须要完成的,去掉对Diamond的依赖之后,对于数据的持久化和一致性则需要自己来完成。

去Diamond之后,VipServer集群中的每台server都会存储全量数据,提高了系统容灾能力,其中集群中机器上的数据一致性使用Raft协议来保证。

3、Raft的原理


在介绍Raft算法原理之前,可以借助Raft的动画演示先直观的看看Raft在解决什么问题以及解决的过程。

相比于Paxos的晦涩难懂,Raft被设计的更易于理解和实现:

Raft算法状态转换如下图,主要涉及到leader选举、日志复制(数据同步)、安全性三个部分:

Raft状态转换图

3.1 leader选举

集群选举的过程如下:

3.2 日志复制(数据同步)

leader被选举出来以后,所有的读写请求都会经过leader来处理,leader负责对follower进行数据同步,即日志复制。

日志复制包括历史数据同步和数据更新请求:

3.3 安全性

集群中机器在可能遇到的问题:

4、Raft在软负载集群中的实现


软负载集群中的机器都保存全量的机器信息(host:ip+port),机器之间的数据同步由Raft协议保证。

根据Raft原理的介绍,在实际的实现过程中,首要解决leader的选举过程。

4.1 选举

机器启动之后,每台机器都会注册选举和心跳的定时任务。

public static void init() throws Exception {
    peers.add(VIPServerProxy.getVIPServers());

    RaftStore.load();

    while (true) {
        if (notifier.tasks.size() <= 0) {
            break;
        }
    }

    Timer.register(new MasterElection());
    Timer.register1(new HeartBeat());
    Timer.register(new AddressServerUpdater(), Timer.ADDRESS_SERVER_UPDATE_INTVERAL_MS);

    if (peers.size() > 0) {
        if (lock.tryLock(3, TimeUnit.SECONDS) ) {
            initialized = true;
            lock.unlock();
        }
    } else {
        throw new Exception("peers is empty.");
    }
}

选举的等待时长是不定的,用于避免选举的无主问题

public void resetLeaderDue() {
    leaderDueMs = Timer.LEADER_TIMEOUT_MS + RandomUtils.nextLong() % Timer.RAMDOM_MS;
}

等待时长过期则开始选举

public static class MasterElection implements Runnable {
    @Override
    public void run() {
        try {
            RaftPeer local = peers.local();
            local.leaderDueMs -= Timer.TICK_PERIOD_MS;
            if (local.leaderDueMs > 0) {
                return;
            }

            local.resetLeaderDue();
            local.resetHeartbeatDue();

            //等待过期开始发起选举
            sendVote();
        } catch (Exception e) {
            Loggers.RAFT.warn("VIPSERVER-RAFT", "error while master election", e);
        }
    }
    
    public static void sendVote() {
        RaftPeer local = peers.get(NetUtils.localIP());
        local.term++;
        local.voteFor = local.ip;  //自己作为CANDIDATE,发送给其他follower
        local.state   = RaftPeer.State.CANDIDATE;
        
          //向其他机器发送选举请求
          voteRets = sendVoteToAllFollowers(local);
          
          //根据选举结果确定自己是否成为了leader,得到超过一半的选票
          decideLeader(voteRets)
    }
    
    //follower接收CANDIDATE发送的选举请求
    public static RaftPeer receivedVote(RaftPeer remote) {
        RaftPeer local = peers.get(NetUtils.localIP());
          //CANDIDATE的选举号小于自己的,不参与投票
        if (remote.term <= local.term) {
            throw new IllegalArgumentException(msg);
        }

        //开始投票,重设自己的选举等待周期
        local.resetLeaderDue();

        //自己作为投票FOLLOWER给remote
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term = remote.term;
    }
}

如果选举成功,则CANDIDATE成为leader,leader开始同follower进行心跳和日志复制(数据同步)。
如果选举失败,则等待下一个follower的选举周期到期,进行新一轮选举。

4.2 心跳

leader负责对向所有的follower发送心跳和数据同步。

 public static class HeartBeat implements Runnable {
    @Override
    public void run() {
        try {
            RaftPeer local = peers.local();
            local.heartbeatDueMs -= Timer.TICK_PERIOD_MS;
            if (local.heartbeatDueMs > 0) {
                return;
            }

            local.resetHeartbeatDue();

               //定时心跳
            sendBeat();
        } catch (Exception e) {
            Loggers.RAFT.warn("VIPSERVER-RAFT", "error while sending beat", e);
        }

    }
 
     public static void sendBeat() throws IOException, InterruptedException {
        //自己不是leader不能发生心跳
        if (local.state != RaftPeer.State.LEADER) {return;}

        local.resetLeaderDue();

        //构建leader上的数据信息key、timestamp,不包括具体的value
        data.raftPeer  = local;
        data.key       = key;
        data.timestamp = timestamp;
        data.value     = null;
        
        sendBeatToAllFolowers(data.key);
     
        //...
     }
     
     public static RaftPeer receivedBeat(JSONObject beat) throws Exception {
        final RaftPeer local = peers.local();
        final RaftPeer remote = beat.getObject("peer", RaftPeer.class);

        //收到的心跳不是leader发来的,丢弃
        if (remote.state != RaftPeer.State.LEADER) {
            throw new IllegalArgumentException("xxx");
        }

        //收到心跳的选举号小于自己的,丢弃
        if (local.term > remote.term) {
            throw new IllegalArgumentException("xxx");
        }

        if (local.state != RaftPeer.State.FOLLOWER) {
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;
        }

        final JSONArray beatDatums = beat.getJSONArray("datums");
        
        //重置自己的选举周期和心跳周期,用于follower长时间没有收到leader发来的心跳
        //leader挂掉时,自己可以重新发起选举流程
        local.resetLeaderDue();
        local.resetHeartbeatDue();

        peers.makeLeader(remote);
        
        //根据leader data.timestamp和本机的timestamp,确定是否需要从leader更新数据
        updateDataFromLeader(data);
        
        //...
     }
     
     public static void updateDataFromLeader(remoteData){
        if(remoteData.timestamp > local.timestamp){
            //从leader获取数据并更新
        }
     }
 }

4.3 数据更新

整个集群中,只有leader能对外提供数据更新服务。

 public static void signalPublish(String key, String value) throws Exception {
    operateLock.lock();

    try {
        if (!RaftCore.isLeader()) {
            JSONObject params = new JSONObject();
            params.put("key", key);
            params.put("value", value);

            //如果自己不是leader,则将请求转发给leader去处理
            RaftProxy.proxyPostLarge(API_PUB, params.toJSONString());
            return;
        }

          //请求到达leader,处理数据更新
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        datum.timestamp = System.currentTimeMillis();

        JSONObject json = new JSONObject();
        json.put("datum", datum);
        json.put("source", peers.local());

        //leader数据更新
        onPublish(json);
        
        //发送数据到所有的follower进行更新
        successOverHalf = sendPublisToAllFollowers(datum);
  
        //数据是否有一半机器更新成功,则通知保存数据
        if(successOverHalf){
             sendStoreToAllFollowers()
        }
  
          //...
  }

follower接收到leader发送来的数据更新请求

public static void onPublish(JSONObject params){
    RaftPeer source = params.getObject("source", RaftPeer.class);
    Datum datum = params.getObject("datum", Datum.class);
    
    //如果不是leader发送过来的则丢弃
    if (!PeerSet.isLeader(source.ip)) {
       throw new IllegalStateException("xxx");
    }

     //如果接收到的选举版本号低于自己的则丢弃
    if (source.term < local.term) {
        throw new IllegalStateException("xxx");
    }
    
    //更新自己的选举时间
    local.resetLeaderDue();
    
    //更新数据到文件
    RaftStore.write(datum);
}      

5、后记

本文专注于软负载系统中数据的一致性保证以及raft协议的分析,对于软负载系统中涉及到其他的问题,如:负载均衡的策略、活性检测、集群部署方案等没有过多的深入。

上一篇 下一篇

猜你喜欢

热点阅读