关于RAFT后续的一些思考(一)

2019-02-11  本文已影响12人  fooboo

这段时间也在总结之前分析过的源码和记录的文章内容,为了加深理解,需要结合具体需求和业务,去思考更多可能。

之前分析过raft的基本实现Floyd&Raft的源码分析和对应的部分论文资料,大部分是按照论文来实现的,有些其他方面的并没有在代码中实现。后来因为想在过年假期把braft和bfs分析,看到github上关于raft的资料介绍braft RAFT介绍,发现这上面说的是之前没有考虑到的一些情况,所以这里再结合之前的源码总结下,这里还是要参考下tikv的pingcap/raft-rs

源码中,某个处于follower状态的node随机超时后从follower->candidate并对自己的term加1,并向其他节点请求投票,此时会带上自己的term,ip/port(可选),last_log_term和last_log_index;其他node收到后进行term比较,如果request_term > my_term就把自己变成follower,并对齐到request node的term,也不管自己是处于什么状态:

714 int FloydImpl::ReplyRequestVote(const CmdRequest& request, CmdResponse* response) {
715   slash::MutexLock l(&context_->global_mu);
716   bool granted = false;
717   CmdRequest_RequestVote request_vote = request.request_vote();
718   /*
719    * If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (5.1)
720    */
721   if (request_vote.term() > context_->current_term) {
722     context_->BecomeFollower(request_vote.term());
723     raft_meta_->SetCurrentTerm(context_->current_term);
724   } 
725   // if caller's term smaller than my term, then I will notice him
726   if (request_vote.term() < context_->current_term) {
727     BuildRequestVoteResponse(context_->current_term, granted, response);
728     return -1;
729   }

bfs中的raft功能实现也是如此:

231 void RaftNodeImpl::Vote(::google::protobuf::RpcController* controller,
232                     const ::baidu::bfs::VoteRequest* request,
233                     ::baidu::bfs::VoteResponse* response,
234                     ::google::protobuf::Closure* done) {
235     int64_t term = request->term();
236     const std::string& candidate = request->candidate();
237     int64_t last_log_index = request->last_log_index();
238     int64_t last_log_term = request->last_log_term();
239     LOG(INFO, "Recv vote request: %s %ld %ld / (%s %ld %ld)",
240         candidate.c_str(), term, last_log_term,
241         voted_for_.c_str(), current_term_, log_term_);
242     MutexLock lock(&mu_);
243     CheckTerm(term);
244     if (term >= current_term_
245         && (voted_for_ == "" || voted_for_ == candidate)
246         && (last_log_term > log_term_ ||
247         (last_log_term == log_term_ && last_log_index >= log_index_))) {
248         voted_for_ = candidate;
249         if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) {
250             LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_);
251         } else {
252             LOG(INFO, "Granted %s %ld %ld", candidate.c_str(), term, last_log_index);
253         }
254         response->set_vote_granted(true);
255         response->set_term(term);
256         done->Run();
257         return;
258     }
259 
260     response->set_vote_granted(false);
261     response->set_term(current_term_);
262     done->Run();
263 }

引用braft RAFT介绍中的“原始的RAFT论文中对非对称的网络划分处理不好,比如S1、S2、S3分别位于三个IDC,其中S1和S2之间网络不通,其他之间可以联通。这样一旦S1或者是S2抢到了Leader,另外一方在超时之后就会触发选主,例如S1为Leader,S2不断超时触发选主,S3提升Term打断当前Lease,从而拒绝Leader的更新。”和“原始的RAFT论文中对于对称网络划分的处理是,一个节点再次上线之后,Leader接收到高于currentTerm的RequestVote请求就进行StepDown。这样即使这个节点已经通过RemovePeer删除了,依然会打断当前的Lease,导致复制组不可用。”

Asymmetric network partitioning
原引用中也给出相关的解决方案:
  1. 对于属于PeerSet中的节点,Leader会在重试的AppendEntries中因为遇到更高的term而StepDown;
  2. 对于不属于PeerSet中的节点,Leader永远忽略;

StepDown
RAFT原始协议中Leader收到任何term高于currentTerm的请求都会进行StepDown,在实际开发中应该在以下几个时刻进行StepDown:

  1. Leader接收到AppendEntries的失败应答,Term比currentTerm大;
  2. Leader在ElectionTimeout内没有写多数成功,通过logic clock检查实现(1个ElectionTimeout内会有10个HeartBeat);
  3. Leader在进行RemovePeer的LogEntry被Commit的时候,不在节点列表中,进行StepDown,通常还会进行Shutdown;

这里分析braft中的代码实现:
某节点进行pre_vote,重点是1377行代码:

1372         OnPreVoteRPCDone* done = new OnPreVoteRPCDone(*iter, _current_term, this);
1373         done->cntl.set_timeout_ms(_options.election_timeout_ms);
1374         done->request.set_group_id(_group_id);
1375         done->request.set_server_id(_server_id.to_string());
1376         done->request.set_peer_id(iter->to_string());
1377         done->request.set_term(_current_term + 1); // next term
1378         done->request.set_last_log_index(last_log_id.index);
1379         done->request.set_last_log_term(last_log_id.term);

收到节点的``pre_vote```:

 31 void RaftServiceImpl::pre_vote(google::protobuf::RpcController* cntl_base,
 32                           const RequestVoteRequest* request,
 33                           RequestVoteResponse* response,
 34                           google::protobuf::Closure* done) {
 35     brpc::ClosureGuard done_guard(done);
 36     brpc::Controller* cntl =
 37         static_cast<brpc::Controller*>(cntl_base);
 38     
 39     PeerId peer_id;
 40     if (0 != peer_id.parse(request->peer_id())) {
 41         cntl->SetFailed(EINVAL, "peer_id invalid");
 42         return;
 43     }
 44 
 45     scoped_refptr<NodeImpl> node_ptr = NodeManager::GetInstance()->get(request->group_id(),
 46                                                                        peer_id);
 47     NodeImpl* node = node_ptr.get(); //判断节点是否存在
 48     if (!node) {
 49         cntl->SetFailed(ENOENT, "peer_id not exist");
 50         return;
 51     }
 52         
 53     // TODO: should return butil::Status
 54     int rc = node->handle_pre_vote_request(request, response);
 55     if (rc != 0) {        
 56         cntl->SetFailed(rc, "%s", berror(rc));
 57         return;
 58     }   
 59 }

291 inline bool is_active_state(State s) {
292     // This should be as fast as possible
293     return s < STATE_ERROR;
294 }

267 enum State { //节点的几种状态
269     STATE_LEADER = 1,
270     STATE_TRANSFERRING = 2,
271     STATE_CANDIDATE = 3,
272     STATE_FOLLOWER = 4,
273     STATE_ERROR = 5,
274     STATE_UNINITIALIZED = 6,
275     STATE_SHUTTING = 7,
276     STATE_SHUTDOWN = 8,
277     STATE_END,
278 }; 

1748 int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
1749                                       RequestVoteResponse* response) {
1750     std::unique_lock<raft_mutex_t> lck(_mutex);
1751     
1752     if (!is_active_state(_state)) {//判断该节点的状态
1755         lck.unlock();
1759         return EINVAL;
1760     }
1761 
1762     PeerId candidate_id;
1763     if (0 != candidate_id.parse(request->server_id())) {//解析失败
1767         return EINVAL;
1768     }
1770     bool granted = false;
1771     do {
1772         if (request->term() < _current_term) {//请求的term比自己小
1773             // ignore older term
1778             break;
1779         }
1780 
1781         // get last_log_id outof node mutex
1782         lck.unlock();
1783         LogId last_log_id = _log_manager->last_log_id(true);
1784         lck.lock();
1785         // pre_vote not need ABA check after unlock&lock
1786 
1787         granted = (LogId(request->last_log_index(), request->last_log_term())
1788                         >= last_log_id); //比较last_log_index和last_log_term,可以参考之前的floyd分析
1796     } while (0);
1798     response->set_term(_current_term);
1799     response->set_granted(granted);
1800     return 0;
1801 }

在真正给自己投票前,先进行pre_vote,当发起pre_vote节点收到请求后以决定是进行step_down还是继续发起请求:

1296 struct OnPreVoteRPCDone : public google::protobuf::Closure {
1297     OnPreVoteRPCDone(const PeerId& peer_id_, const int64_t term_, NodeImpl* node_)
1298         : peer(peer_id_), term(term_), node(node_) {
1299             node->AddRef();
1300     }
1301     virtual ~OnPreVoteRPCDone() {
1302         node->Release();
1303     }
1304 
1305     void Run() {
1306         do {
1307             if (cntl.ErrorCode() != 0) {
1310                 break;
1311             }
1312             node->handle_pre_vote_response(peer, term, response);
1313         } while (0);
1314         delete this;
1315     }
1316 
1317     PeerId peer;
1318     int64_t term;
1319     RequestVoteRequest request;
1320     RequestVoteResponse response;
1321     brpc::Controller cntl;
1322     NodeImpl* node;
1323 };

1254 void NodeImpl::handle_pre_vote_response(const PeerId& peer_id, const int64_t term,
1255                                             const RequestVoteResponse& response) {
1256     std::unique_lock<raft_mutex_t> lck(_mutex);
1257     
1258     // check state
1259     if (_state != STATE_FOLLOWER) {
1263         return;
1264     }
1265     // check stale response
1266     if (term != _current_term) {
1270         return;
1271     }
1272     // check response term
1273     if (response.term() > _current_term) {
1277         butil::Status status;
1278         status.set_error(EHIGHERTERMRESPONSE, "Raft node receives higher term "
1279                 "pre_vote_response.");
1280         step_down(response.term(), false, status);
1281         return;
1282     }
1287     // check granted quorum?
1288     if (response.granted()) {
1289         _pre_vote_ctx.grant(peer_id);
1290         if (_pre_vote_ctx.granted()) {
1291             elect_self(&lck);
1292         }
1293     }
1294 }
1392 void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {
1396     if (!_conf.contains(_server_id)) {
1399         return;
1400     }
1401     // cancel follower election timer
1402     if (_state == STATE_FOLLOWER) {
1405         _election_timer.stop();
1406     }

1415     _state = STATE_CANDIDATE;
1416     _current_term++;
1417     _voted_id = _server_id;

1437     std::set<PeerId> peers;
1438     _conf.list_peers(&peers);
1439 
1440     for (std::set<PeerId>::const_iterator
1441         iter = peers.begin(); iter != peers.end(); ++iter) {
1442         if (*iter == _server_id) {
1443             continue;
1444         }
1445         brpc::ChannelOptions options;
1446         options.connection_type = brpc::CONNECTION_TYPE_SINGLE;
1447         options.max_retry = 0;
1448         brpc::Channel channel;
1449         if (0 != channel.Init(iter->addr, &options)) {
1452             continue;
1453         }
1454         //向每个节点请求投票
1455         OnRequestVoteRPCDone* done = new OnRequestVoteRPCDone(*iter, _current_term, this);
1456         done->cntl.set_timeout_ms(_options.election_timeout_ms);
1457         done->request.set_group_id(_group_id);
1458         done->request.set_server_id(_server_id.to_string());
1459         done->request.set_peer_id(iter->to_string());
1460         done->request.set_term(_current_term);
1461         done->request.set_last_log_index(last_log_id.index);
1462         done->request.set_last_log_term(last_log_id.term);
1463 
1464         RaftService_Stub stub(&channel);
1465         stub.request_vote(&done->cntl, &done->request, &done->response, done);
1466     }

1468     //TODO: outof lock
1469     _meta_storage->set_term_and_votedfor(_current_term, _server_id);
1470     _vote_ctx.grant(_server_id);
1471     if (_vote_ctx.granted()) {
1472         become_leader();
1473     }
1474 }

处理投票请求:

 61 void RaftServiceImpl::request_vote(google::protobuf::RpcController* cntl_base,
 62                           const RequestVoteRequest* request,
 63                           RequestVoteResponse* response,
 64                           google::protobuf::Closure* done) {
 65     brpc::ClosureGuard done_guard(done);
 66     brpc::Controller* cntl =
 67         static_cast<brpc::Controller*>(cntl_base);
 68 
 69     PeerId peer_id;
 70     if (0 != peer_id.parse(request->peer_id())) {
 71         cntl->SetFailed(EINVAL, "peer_id invalid");
 72         return;
 73     }
 74 
 75     scoped_refptr<NodeImpl> node_ptr = NodeManager::GetInstance()->get(request->group_id(),
 76                                                                        peer_id);
 77     NodeImpl* node = node_ptr.get();
 78     if (!node) {
 79         cntl->SetFailed(ENOENT, "peer_id not exist");
 80         return;
 81     }
 82 
 83     int rc = node->handle_request_vote_request(request, response);
 84     if (rc != 0) {
 85         cntl->SetFailed(rc, "%s", berror(rc));
 86         return;
 87     }
 88 }

1803 int NodeImpl::handle_request_vote_request(const RequestVoteRequest* request,
1804                                           RequestVoteResponse* response) {
1805     std::unique_lock<raft_mutex_t> lck(_mutex);
1806 
1807     if (!is_active_state(_state)) {
1810         lck.unlock();
1814         return EINVAL;
1815     }
1816 
1817     PeerId candidate_id;
1818     if (0 != candidate_id.parse(request->server_id())) {
1822         return EINVAL;
1823     }
1825     do {
1826         // check term
1827         if (request->term() >= _current_term) {
1832             // incress current term, change state to follower
1833             if (request->term() > _current_term) {
1834                 butil::Status status;
1835                 status.set_error(EHIGHERTERMREQUEST, "Raft node receives higher term "
1836                         "request_vote_request.");
1837                 step_down(request->term(), false, status);
1838             }
1839         } else {
1840             // ignore older term
1845             break;
1846         }
1847 
1848         // get last_log_id outof node mutex
1849         lck.unlock();
1850         LogId last_log_id = _log_manager->last_log_id(true);
1851         lck.lock();
1852         // vote need ABA check after unlock&lock
1853         if (request->term() != _current_term) {
1856             break;
1857         }
1859         bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term())
1860                           >= last_log_id);
1861         // save
1862         if (log_is_ok && _voted_id.is_empty()) {
1863             butil::Status status;
1864             status.set_error(EVOTEFORCANDIDATE, "Raft node votes for some candidate, "
1865                     "step down to restart election_timer.");
1866             step_down(request->term(), false, status);
1867             _voted_id = candidate_id;
1868             //TODO: outof lock
1869             _meta_storage->set_votedfor(candidate_id);
1870         }
1871     } while (0);
1872 
1873     response->set_term(_current_term);
1874     response->set_granted(request->term() == _current_term && _voted_id == candidate_id);
1875     return 0;
1876 }

节点收到投票响应后:

1592 void NodeImpl::become_leader() {
1593     CHECK(_state == STATE_CANDIDATE);
1600     // cancel candidate vote timer
1601     _vote_timer.stop();
1602 
1603     _state = STATE_LEADER;
1604     _leader_id = _server_id;
1605 
1606     _replicator_group.reset_term(_current_term);
1607 
1608     std::set<PeerId> peers;
1609     _conf.list_peers(&peers);
1610     for (std::set<PeerId>::const_iterator
1611             iter = peers.begin(); iter != peers.end(); ++iter) {
1612         if (*iter == _server_id) {
1613             continue;
1614         }
1619         //TODO: check return code
1620         _replicator_group.add_replicator(*iter);
1621     }
1623     // init commit manager
1624     _ballot_box->reset_pending_index(_log_manager->last_log_index() + 1);
1625 
1626     // Register _conf_ctx to reject configuration changing before the first log
1627     // is committed.
1628     CHECK(!_conf_ctx.is_busy());
1629     _conf_ctx.flush(_conf.conf, _conf.old_conf);
1630     _stepdown_timer.start();
1631 }

引用Raft的PreVote实现机制
Raft作者博士论文《CONSENSUS: BRIDGING THEORY AND PRACTICE》的第9.6节 "Preventing disruptions when a server rejoins the cluster"提到了PreVote算法的大概实现思路。
在PreVote算法中,Candidate首先要确认自己能赢得集群中大多数节点的投票,这样才会把自己的term增加,然后发起真正的投票。其他投票节点同意发起选举的条件是(同时满足下面两个条件):

  1. 没有收到有效领导的心跳,至少有一次选举超时。
  2. Candidate的日志足够新(Term更大,或者Term相同raft index更大)。

PreVote算法解决了网络分区节点在重新加入时,会中断集群的问题。在PreVote算法中,网络分区节点由于无法获得大部分节点的许可,因此无法增加其Term。然后当它重新加入集群时,它仍然无法递增其Term,因为其他服务器将一直收到来自Leader节点的定期心跳信息。一旦该服务器从领导者接收到心跳,它将返回到Follower状态,Term和Leader一致。”

大部分还是和论文及其他raft的实现差不多,但braft中并没有实现上面引用中的第一个条件,自己加上也比较简单。里面还有很多值得学习的设计,后面继续分析下snapshot的实现。

braft中的代码写的比较复杂,没仔细研究过。

上一篇下一篇

猜你喜欢

热点阅读