
2019-10-06  本文已影响0人  fooboo




 35 // Select a server from a set of servers (in form of ServerId).
 36 class LoadBalancer : public NonConstDescribable, public Destroyable {
 78     virtual bool AddServer(const ServerId& server) = 0;
 82     virtual bool RemoveServer(const ServerId& server) = 0;
 96     virtual int SelectServer(const SelectIn& in, SelectOut* out) = 0;
 97     //more code...
107 protected:
108     virtual ~LoadBalancer() { }
109 };


 32 // This LoadBalancer selects server evenly. Selected numbers of servers(added
 33 // at the same time) are very close.
 34 class RoundRobinLoadBalancer : public LoadBalancer {
 35 public:
 36     bool AddServer(const ServerId& id);
 37     bool RemoveServer(const ServerId& id);
 40     int SelectServer(const SelectIn& in, SelectOut* out);
 41     RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
 45 private:
 46     struct Servers {
 47         std::vector<ServerId> server_list;
 48         std::map<ServerId, size_t> server_map;
 49     };  
 50     struct TLS {
 51         TLS() : stride(0), offset(0) { }
 52         uint32_t stride;
 53         uint32_t offset;
 54     };  
 55     //more code...
 61     butil::DoublyBufferedData<Servers, TLS> _db_servers;
 63 };

 81 bool RoundRobinLoadBalancer::AddServer(const ServerId& id) {
 82     return _db_servers.Modify(Add, id);
 83 }
 85 bool RoundRobinLoadBalancer::RemoveServer(const ServerId& id) {
 86     return _db_servers.Modify(Remove, id);
 87 }

 37 bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
 38     if (bg.server_list.capacity() < 128) {
 39         bg.server_list.reserve(128);
 40     }
 41     std::map<ServerId, size_t>::iterator it = bg.server_map.find(id);
 42     if (it != bg.server_map.end()) {
 43         return false;
 44     }
 45     bg.server_map[id] = bg.server_list.size();
 46     bg.server_list.push_back(id);
 47     return true;
 48 }
 50 bool RoundRobinLoadBalancer::Remove(Servers& bg, const ServerId& id) {
 51     std::map<ServerId, size_t>::iterator it = bg.server_map.find(id);
 52     if (it != bg.server_map.end()) {
 53         const size_t index = it->second;
 54         bg.server_list[index] = bg.server_list.back();
 55         bg.server_map[bg.server_list[index]] = index;
 56         bg.server_list.pop_back();
 57         bg.server_map.erase(it);
 58         return true;
 59     }
 60     return false;
 61 }



148     butil::atomic<int64_t> _total;
149     butil::DoublyBufferedData<Servers> _db_servers;
150     std::deque<int64_t> _left_weights;
151     ServerId2SocketIdMapper _id_mapper;

116     class Servers {
117     public:
118         std::vector<ServerInfo> weight_tree;
119         butil::FlatMap<SocketId, size_t> server_map;
120         //more code...
128     };


107 int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
108     butil::DoublyBufferedData<Servers, TLS>::ScopedPtr s;
109     if (_db_servers.Read(&s) != 0) {
110         return ENOMEM;
111     }
112     const size_t n = s->server_list.size();
113     if (n == 0) {
114         return ENODATA;
115     }
116     //more code...
121     TLS tls = s.tls();
122     if (tls.stride == 0) {
123         tls.stride = GenRandomStride();
124         tls.offset = 0;
125     }
127     for (size_t i = 0; i < n; ++i) {
128         tls.offset = (tls.offset + tls.stride) % n;
129         const SocketId id = s->server_list[tls.offset].id;
130         if (((i + 1) == n  // always take last chance
131              || !ExcludedServers::IsExcluded(in.excluded, id))
132             && Socket::Address(id, out->ptr) == 0
133             && (*out->ptr)->IsAvailable()) {
134             s.tls() = tls;
135             return 0;
136         }
137     }   
138     //more code...   
141     s.tls() = tls;
142     return EHOSTDOWN;
143 }





 53 bool LocalityAwareLoadBalancer::Add(Servers& bg, const Servers& fg,
 54                                     SocketId id,
 55                                     LocalityAwareLoadBalancer* lb) {
 56     if (bg.weight_tree.capacity() < INITIAL_WEIGHT_TREE_SIZE) {
 57         bg.weight_tree.reserve(INITIAL_WEIGHT_TREE_SIZE);
 58     }
 59     if ( != NULL) {
 60         // The id duplicates.
 61         return false;
 62     }
 63     const size_t* pindex =;
 64     if (pindex == NULL) {
 69         const size_t index = bg.weight_tree.size();
 74         int64_t initial_weight = WEIGHT_SCALE;
 75         if (!bg.weight_tree.empty()) {
 76             initial_weight = lb->_total.load(butil::memory_order_relaxed)
 77                 / bg.weight_tree.size();//取平均值
 78         }
 82         bg.server_map[id] = index;//新增节点
 87         ServerInfo info = { id, lb->PushLeft(), new Weight(initial_weight) };//构造权重结构
 88         bg.weight_tree.push_back(info);
 93         const int64_t diff = info.weight->volatile_value();
 94         if (diff) {
 95             bg.UpdateParentWeights(diff, index);//更新权重二叉树
 96             lb->_total.fetch_add(diff, butil::memory_order_relaxed);//累加_total
 97         }
 98     } else {
101         bg.server_map[id] = bg.weight_tree.size();
102         bg.weight_tree.push_back(fg.weight_tree[*pindex]);
103     }
104     return true;
105 }

110     struct ServerInfo {
111         SocketId server_id;
112         butil::atomic<int64_t>* left;
113         Weight* weight;
114     };

227 bool LocalityAwareLoadBalancer::AddServer(const ServerId& id) {
228     if (_id_mapper.AddServer(id)) {//第一次添加该节点
230         return _db_servers.ModifyWithForeground(Add,, this);
231     } else {
232         return true;
233     }
234 }

以上实现是新增加一个服务节点信息,此时会更新后台节点及权重信息。如连接中说明,“LALB的查找过程是按权值分流,O(N)方法如下:获得所有权值的和total,产生一个间于[0, total-1]的随机数R,逐个遍历权值,直到当前权值之和不大于R,而下一个权值之和大于R。这个方法可以工作,也好理解,但当N达到几百时性能已经很差,这儿的主要因素是cache一致性:LALB是一个基于反馈的算法,RPC结束时信息会被反馈入LALB,被遍历的数据结构也一直在被修改。这意味着前台的O(N)读必须刷新每一行cacheline。当N达到数百时,一次查找过程可能会耗时百微秒,更别提更大的N了,LALB(将)作为brpc的默认分流算法,这个性能开销是无法接受的。



411 template <typename T, typename TLS>
412 template <typename Fn, typename Arg1, typename Arg2>
413 size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(
414     Fn& fn, const Arg1& arg1, const Arg2& arg2) {
415     WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
416     return Modify(c);
417 }

131     template <typename Fn, typename Arg1, typename Arg2>
132     struct WithFG2 {
133         WithFG2(Fn& fn, T* data, const Arg1& arg1, const Arg2& arg2)
134             : _fn(fn), _data(data), _arg1(arg1), _arg2(arg2) {}
135         size_t operator()(T& bg) {
136             return _fn(bg, (const T&)_data[&bg == _data], _arg1, _arg2);
137         }
138     private:
139         Fn& _fn;
140         T* _data;
141         const Arg1& _arg1;
142         const Arg2& _arg2;
143     };
353     int bg_index = !_index.load(butil::memory_order_relaxed);
356     const size_t ret = fn(_data[bg_index]);//更新后台
365, butil::memory_order_release);//更新最新index
366     bg_index = !bg_index;//旧前台
368     //block writer...  //直到所有读线程不再读旧前台
377     const size_t ret2 = fn(_data[bg_index]);//更新旧前台

一开始对于新增加的节点,前台数据肯定是空的,第一次更新时,bg是的index为1,fg为0,所以pindex == NULL,然后bg.weight_tree.push_back(info),以所在的index,即vector数组的最新位置作为index,即bg.server_map[id] = index,并以当前节点中的平均权值作为initial_weight

110     struct ServerInfo {
111         SocketId server_id;
112         butil::atomic<int64_t>* left;
113         Weight* weight;
114     };

141     // Add a entry to _left_weights.
142     butil::atomic<int64_t>* PushLeft() {
143         _left_weights.push_back(0);
144         return (butil::atomic<int64_t>*)&_left_weights.back();
145     }


154 inline void LocalityAwareLoadBalancer::Servers::UpdateParentWeights(
155     int64_t diff, size_t index) const {
156     while (index != 0) {
157         const size_t parent_index = (index - 1) >> 1;
158         if ((parent_index << 1) + 1 == index) {  // left child
159             weight_tree[parent_index].left->fetch_add(
160                 diff, butil::memory_order_relaxed);
161         }   
162         index = parent_index;
163     }   
164 }

记录左子树的权重和,举个简单例子,有vector<int> value = {1,4,5,7,8,15,20},因为7/8/15/20为叶子节点,那么weight[3~6]=0,weight[1]=7,weight[2]=15,weight[0]=19。

然后再更新旧前台时,此时index为0,bg是的index为0,fg为1,所以pindex != NULL,直接用新前台的数据拷贝到旧前台,这里会共用Weight内存,后面删除时要小心。


236 bool LocalityAwareLoadBalancer::RemoveServer(const ServerId& id) {
237     if (_id_mapper.RemoveServer(id)) {
239         return _db_servers.Modify(Remove,, this);
240     } else {
241         return true;
242     }
243 }

107 bool LocalityAwareLoadBalancer::Remove(
108     Servers& bg, SocketId id, LocalityAwareLoadBalancer* lb) {
109     size_t* pindex =;
110     if (NULL == pindex) {
111         // The id does not exist.
112         return false;
113     }
114     // Save the index and remove mapping from id to the index.
115     const size_t index = *pindex;
116     bg.server_map.erase(id);
118     Weight* w = bg.weight_tree[index].weight;
122     const int64_t rm_weight = w->Disable();
123     if (index + 1 == bg.weight_tree.size()) {
124         // last node. Removing is eaiser.
125         bg.weight_tree.pop_back();
126         if (rm_weight) {
132             int64_t diff = -rm_weight;
133             bg.UpdateParentWeights(diff, index);
134             lb->_total.fetch_add(diff, butil::memory_order_relaxed);
135         } else {
136             // the second buffer. clean left stuff.
137             delete w;
138             lb->PopLeft();
139         }
140     } else {
141         // Move last node to position `index' to fill the space.
142         bg.weight_tree[index].server_id = bg.weight_tree.back().server_id;
143         bg.weight_tree[index].weight = bg.weight_tree.back().weight;
144         bg.server_map[bg.weight_tree[index].server_id] = index;
145         bg.weight_tree.pop_back();
147         Weight* w2 = bg.weight_tree[index].weight;  // previously back()
148         if (rm_weight) {
159             const int64_t add_weight = w2->MarkOld(bg.weight_tree.size());
164             const int64_t diff = add_weight - rm_weight;
165             if (diff) {
166                 bg.UpdateParentWeights(diff, index);
167                 lb->_total.fetch_add(diff, butil::memory_order_relaxed);
168             }
171         } else {
175             const std::pair<int64_t, int64_t> p = w2->ClearOld();
176             // Add the diff to parent nodes of node `index'
177             const int64_t diff = p.second;
178             if (diff) {
179                 bg.UpdateParentWeights(diff, index);
180             }
181             // Remove weight from parent nodes of last node.
182             int64_t old_weight = - p.first - p.second;
183             if (old_weight) {
184                 bg.UpdateParentWeights(old_weight, bg.weight_tree.size());
185             }
186             lb->_total.fetch_add(- p.first, butil::memory_order_relaxed);
187             // Clear resources.
188             delete w;
189             lb->PopLeft();
190         }
191     }
192     return true;
193 }

先更新后台数据,获取对应index上的Weight* w,并进行Disable,这样后面rpc返回时更新权重延时信息时,直接跳过。如果是最后一个节点则删除,即bg.weight_tree.pop_back()。在该节点权重不为0的情况下(第一次删除,此时前台还在使用中),因为后台存在的话前台肯定也存在,所以此时不能直接delete w,先进行调整权重树:

126         if (rm_weight) {
132             int64_t diff = -rm_weight;
133             bg.UpdateParentWeights(diff, index);
134             lb->_total.fetch_add(diff, butil::memory_order_relaxed);
135         } else {
137             delete w;
138             lb->PopLeft();
139         }


当要删除的节点非最后一个时,一切变的有些复杂起来,当然也不难。对于后台数据更新时,这里实现是把最后一个节点覆盖到index,并删除最后一个节点,此时要调整权重树(add_weight - rm_weight)。并记录删除的位置待更新前台时再特殊处理,中间还是要考虑更新权重时的情况:

563 int64_t LocalityAwareLoadBalancer::Weight::MarkOld(size_t index) {
564     BAIDU_SCOPED_LOCK(_mutex);
565     const int64_t saved = _weight;
566     _old_weight = saved;//删除时的权重
567     _old_diff_sum = 0;//重置,用于删除时后面变化的
568     _old_index = index;//删除的位置
569     return saved;
570 }

555 int64_t LocalityAwareLoadBalancer::Weight::Disable() {
556     BAIDU_SCOPED_LOCK(_mutex);
557     const int64_t saved = _weight;
558     _base_weight = -1;
559     _weight = 0;
560     return saved;
561 }


150 // We need to remove the weight of last node from its parent
151 // nodes and add the weight to parent nodes of node `index'.
152 // However this process is not atomic. The foreground buffer still
153 // sees w2 as last node and it may change the weight during the
154 // process. To solve this problem, we atomically reset the weight
155 // and remember the preivous index (back()) in _old_index. Later
156 // change to weight will add the diff to _old_diff_sum if _old_index
157 // matches the index which SelectServer is from. In this way we
158 // know the weight diff from foreground before we laterly modify it.

对于前台数据来说,此时还是一棵权重二叉树,符合完全二叉树的权重和,但因后台要删除非最后一个节点,若直接修改与前台共享的w2权重数据则会破坏权重和,导致执行SelectServer时选出错误的节点。此时前台看到的w2还是完整的。这里在原来的基础之前再加上add_weight - rm_weight,但这一步为了不破坏权重和会在后面更新旧前台时删除。

161  // Add the weight diff to parent nodes of node `index'. Notice
162 // that we don't touch parent nodes of last node here because
163 // foreground is still sending traffic to last node.


572 std::pair<int64_t, int64_t> LocalityAwareLoadBalancer::Weight::ClearOld() {
573     BAIDU_SCOPED_LOCK(_mutex);
574     const int64_t old_weight = _old_weight;
575     const int64_t diff = _old_diff_sum;
576     _old_diff_sum = 0;
577     _old_index = (size_t)-1;
578     _old_weight = 0;
579     return std::make_pair(old_weight, diff);
580 }

接着w2->ClearOld,并更新权重树,这里实现的原因是在其他地方比如SelectServer会修改Weight内部数据。因为之前修改后台时,因可能破坏前台的权重树结构,并没有更新last node的父节点们的权重,故这里需要把之前该做的情况处理掉:

173             // Reset _old_* fields and get the weight change by SelectServer()
174             // after MarkOld().
175             const std::pair<int64_t, int64_t> p = w2->ClearOld();
176             // Add the diff to parent nodes of node `index'
177             const int64_t diff = p.second;
178             if (diff) {
179                 bg.UpdateParentWeights(diff, index);
180             }
181             // Remove weight from parent nodes of last node.
182             int64_t old_weight = - p.first - p.second;
183             if (old_weight) {
184                 bg.UpdateParentWeights(old_weight, bg.weight_tree.size());
185             }
186             lb->_total.fetch_add(- p.first, butil::memory_order_relaxed);
187             // Clear resources.
188             delete w;
189             lb->PopLeft();
266 int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
267     butil::DoublyBufferedData<Servers>::ScopedPtr s;
268     if (_db_servers.Read(&s) != 0) {
269         return ENOMEM;
270     }
271     const size_t n = s->weight_tree.size();
272     if (n == 0) {
273         return ENODATA;
274     }
275     size_t ntry = 0;
276     size_t nloop = 0;
277     int64_t total = _total.load(butil::memory_order_relaxed);
278     int64_t dice = butil::fast_rand_less_than(total);
279     size_t index = 0;
280     int64_t self = 0;
281     while (total > 0) {
285         if (++nloop > 10000) {//防止死循环
287             return EHOSTDOWN;
288         }
293         const ServerInfo & info = s->weight_tree[index];
294         const int64_t left = info.left->load(butil::memory_order_relaxed);
295         if (dice < left) {//左边
296             index = index * 2 + 1;
297             if (index < n) {
298                 continue;
299             }
300         } else if (dice >= left + (self = info.weight->volatile_value())) {//右边
301             dice -= left + self;
302             index = index * 2 + 2;
303             if (index < n) {
304                 continue;
305             }
306         } else if (Socket::Address(info.server_id, out->ptr) == 0
307                    && (*out->ptr)->IsAvailable()) {
308             //more code...
328         } else if (in.changable_weights) {
329             //more code...
345         }
346         total = _total.load(butil::memory_order_relaxed);
347         dice = butil::fast_rand_less_than(total);
348         index = 0;
349     }
350     return EHOSTDOWN;
351 }

SelectServer根据随机出来的dice来判断落在哪个节点上,因为weight_tree权重和树的结构如上面解释到,如果dice < left则会在左边,如果dice >= left + (self = info.weight->volatile_value())在右边,并更新相应的indexdice,否则就是当前节点,即 left <= dice < left + self


306 if (Socket::Address(info.server_id, out->ptr) == 0 && (*out->ptr)->IsAvailable()) {
308     if ((ntry + 1) == n  || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {


306         } else if (Socket::Address(info.server_id, out->ptr) == 0
307                    && (*out->ptr)->IsAvailable()) {
308             if ((ntry + 1) == n  // Instead of fail with EHOSTDOWN, we prefer
309                                  // choosing the server again.
310                 || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
311                 if (!in.changable_weights) {
312                     return 0;
313                 }
314                 const Weight::AddInflightResult r =
315                     info.weight->AddInflight(in, index, dice - left);
316                 if (r.weight_diff) {
317                     s->UpdateParentWeights(r.weight_diff, index);
318                     _total.fetch_add(r.weight_diff, butil::memory_order_relaxed);
319                 }
320                 if (r.chosen) { 
321                     out->need_feedback = true;
322                     return 0;
323                 } 
324             }
325             if (++ntry >= n) {
326                 break;
327             }

190 inline LocalityAwareLoadBalancer::Weight::AddInflightResult
191 LocalityAwareLoadBalancer::Weight::AddInflight(
192     const SelectIn& in, size_t index, int64_t dice) {
193     BAIDU_SCOPED_LOCK(_mutex);
194     if (Disabled()) {
195         AddInflightResult r = { false, 0 };
196         return r;
197     }
198     const int64_t diff = ResetWeight(index, in.begin_time_us);
199     if (_weight < dice) {
200         // inflight delay makes the weight too small to choose.
201         AddInflightResult r = { false, diff };
202         return r;
203     }
204     _begin_time_sum += in.begin_time_us;
205     ++_begin_time_count;
206     AddInflightResult r = { true, diff };//需要rpc返回时的反馈信息
207     return r;
208 }

166 inline int64_t LocalityAwareLoadBalancer::Weight::ResetWeight(
167     size_t index, int64_t now_us) {
168     int64_t new_weight = _base_weight;
169     if (_begin_time_count > 0) {
170         const int64_t inflight_delay =
171             now_us - _begin_time_sum / _begin_time_count;
172         const int64_t punish_latency = 
173             (int64_t)(_avg_latency * FLAGS_punish_inflight_ratio);
174         if (inflight_delay >= punish_latency && _avg_latency > 0) {
175             new_weight = new_weight * punish_latency / inflight_delay;
176         }
177     }
178     if (new_weight < FLAGS_min_weight) {
179         new_weight = FLAGS_min_weight;
180     }           
181     const int64_t old_weight = _weight;
182     _weight = new_weight;
183     const int64_t diff = new_weight - old_weight;
184     if (_old_index == index && diff != 0) {
185         _old_diff_sum += diff;//累加变化的
186     }
187     return diff;
188 }



328         } else if (in.changable_weights) {
329             const int64_t diff =
330                 info.weight->MarkFailed(index, total / n);
331             if (diff) { 
332                 s->UpdateParentWeights(diff, index);
333                 _total.fetch_add(diff, butil::memory_order_relaxed);
334             }
335             if (dice >= left + self + diff) {
336                 dice -= left + self + diff;
337                 index = index * 2 + 2;
338                 if (index < n) {
339                     continue;
340                 }
341             }
342             if (++ntry >= n) {
343                 break;
344             }
345         }

210 inline int64_t LocalityAwareLoadBalancer::Weight::MarkFailed(
211     size_t index, int64_t avg_weight) {
212     BAIDU_SCOPED_LOCK(_mutex);
213     if (_base_weight <= avg_weight) {
214         return 0;   
215     }           
216     _base_weight = avg_weight;
217     return ResetWeight(index, 0);
218 }


 784     if (need_feedback) {
 785         const LoadBalancer::CallInfo info =
 786             { begin_time_us, peer_id, error_code, c };
 787         c->_lb->Feedback(info);
 788     }

353 void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) {
354     butil::DoublyBufferedData<Servers>::ScopedPtr s;
355     if (_db_servers.Read(&s) != 0) {
356         return;
357     }
358     const size_t* pindex = s->;
359     if (NULL == pindex) {//对应的服务节点已删除
360         return;
361     }
362     const size_t index = *pindex;
363     Weight* w = s->weight_tree[index].weight;
364     const int64_t diff = w->Update(info, index);//尝试更新
365     if (diff != 0) {
366         s->UpdateParentWeights(diff, index);//更新权重树
367         _total.fetch_add(diff, butil::memory_order_relaxed);
368     }
369 }
371 int64_t LocalityAwareLoadBalancer::Weight::Update(
372     const CallInfo& ci, size_t index) {
373     const int64_t end_time_us = butil::gettimeofday_us();
374     const int64_t latency = end_time_us - ci.begin_time_us;
375     BAIDU_SCOPED_LOCK(_mutex);
376     if (Disabled()) {
377         // The weight was disabled and will be removed soon, do nothing
378         // and the diff is 0.
379         return 0;
380     }
382     _begin_time_sum -= ci.begin_time_us;
383     --_begin_time_count;
385     if (latency <= 0) {
386         // time skews, ignore the sample.
387         return 0;
388     }
389     if (ci.error_code == 0) {
390         // Add a new entry 
391         TimeInfo tm_info = { latency, end_time_us };
392         if (!_time_q.empty()) { 
393             tm_info.latency_sum += _time_q.bottom()->latency_sum;
394         }
395         _time_q.elim_push(tm_info);

请求回来后,如果该节点即将被删除Disabled则什么也不做,对成功的请求,会累加tm_info.latency_sum += _time_q.bottom()->latency_sum,用于后面计算平均时延。


396     } else {
410         int ndone = 1;
411         int nleft = 0;
412         if (ci.controller->max_retry() > 0) {
413             ndone = ci.controller->retried_count();
414             nleft = ci.controller->max_retry() - ndone;
415         }   
416         const int64_t err_latency =
417             (nleft * (int64_t)(latency * FLAGS_punish_error_ratio)
418              + ndone * ci.controller->timeout_ms() * 1000L) / (ndone + nleft);
420         if (!_time_q.empty()) {
421             TimeInfo* ti = _time_q.bottom();
422             ti->latency_sum += err_latency;
423             ti->end_time_us = end_time_us;
424         } else {
425             // If the first response is error, enlarge the latency as timedout
426             // since we know nothing about the normal latency yet.
427             const TimeInfo tm_info = {
428                 std::max(err_latency, ci.controller->timeout_ms() * 1000L),
429                 end_time_us
430             };  
431             _time_q.push(tm_info);
432         }


397         // Accumulate into the last entry so that errors always decrease
398         // the overall QPS and latency.
399         // Note that the latency used is linearly mixed from the real latency
400         // (of an errorous call) and the timeout, so that errors that are more
401         // unlikely to be solved by later retries are punished more.
402         // Examples:
403         //   max_retry=0: always use timeout
404         //   max_retry=1, retried=0: latency
405         //   max_retry=1, retried=1: timeout
406         //   max_retry=2, retried=0: latency
407         //   max_retry=2, retried=1: (latency + timeout) / 2
408         //   max_retry=2, retried=2: timeout


435     const int64_t top_time_us =>end_time_us;
436     const size_t n = _time_q.size();
437     int64_t scaled_qps = DEFAULT_QPS * WEIGHT_SCALE;
438     if (end_time_us > top_time_us) {
439         // Only calculate scaled_qps when the queue is full or the elapse
440         // between bottom and top is reasonably large(so that error of the
441         // calculated QPS is probably smaller).
442         if (n == _time_q.capacity() ||
443             end_time_us >= top_time_us + 1000000L/*1s*/) {
444             // will not overflow.
445             scaled_qps = (n - 1) * 1000000L * WEIGHT_SCALE / (end_time_us - top_time_us);
446             if (scaled_qps < WEIGHT_SCALE) {
447                 scaled_qps = WEIGHT_SCALE;
448             }
449         }
450         _avg_latency = (_time_q.bottom()->latency_sum -
451               >latency_sum) / (n - 1);
452     } else if (n == 1) {
453         _avg_latency = _time_q.bottom()->latency_sum;
454     } else {
455         // end_time_us <= top_time_us && n > 1: the QPS is so high that
456         // the time elapse between top and bottom is 0(possible in examples),
457         // or time skews, we don't update the weight for safety.
458         return 0;
459     }
460     _base_weight = scaled_qps / _avg_latency;
461     return ResetWeight(index, end_time_us);
462 }

最后重新计算时延和_base_weight_time_q存储最近128个延迟信息。如果在时间越短内完成的请求越多则scaled_qps越大,_avg_latency取这128个的平均值,最后_base_weight = scaled_qps / _avg_latency并重新调整_weight




QPS和latency使用一个循环队列统计,默认容量128。我们可以使用这么小的统计窗口,是因为inflight delay能及时纠正过度反应,而128也具备了一定的统计可信度。不过,这么计算latency的缺点是:如果server的性能出现很大的变化,那么我们需要积累一段时间才能看到平均延时的变化。就像上节例子中那样,server反转延时后client需要积累很多秒的数据才能看到的平均延时的变化。目前我们并么有处理这个问题,因为真实生产环境中的server不太会像例子中那样跳变延时,大都是缓缓变慢。当集群有几百台机器时,即使我们反应慢点给个别机器少分流点也不会导致什么问题。如果在产品线中确实出现了性能跳变,并且集群规模不大,我们再处理这个问题。

权值的计算方法是base_weight = QPS * WEIGHT_SCALE / latency ^ p。其中WEIGHT_SCALE是一个放大系数,为了能用整数存储权值,又能让权值有足够的精度,类似定点数。p默认为2,延时的收敛速度大约为p=1时的p倍,选项quadratic_latency=false可使p=1。



inflight delay


这样“当前时间 - 发出时间之和 / 未结束次数”便是未结束RPC的平均耗时,我们称之为inflight delay。当inflight delay大于平均延时时,我们就线性地惩罚节点权值,即weight = base_weight * avg_latency / inflight_delay。当发向一个节点的请求没有在平均延时内回来时,它的权值就会很快下降,从而纠正我们的行为,这比等待超时快多了。不过这没有考虑延时的正常抖动,我们还得有方差,方差可以来自统计,也可简单线性于平均延时。不管怎样,有了方差bound后,当inflight delay > avg_latency + max(bound * 3, MIN_BOUND)时才会惩罚权值。3是正态分布中的经验数值。


这里以murmur(multiply and rotate)为哈希函数,关于它的优缺点有:速度快,比安全散列算法快几十倍;相似的字符串如“abc”和“abd”能够均匀散落在哈希环上。

 60 uint32_t MurmurHash32(const void* key, size_t len) {
 61     uint32_t hash;               
 62     butil::MurmurHash3_x86_32(key, (int)len, 0, &hash);
 63     return hash;
 64 }

 83 void MurmurHash3_x86_32 ( const void * key, int len,
 84                           uint32_t seed, void * out ) {
 85   const uint8_t * data = (const uint8_t*)key;
 86   const int nblocks = len / 4;
 88   uint32_t h1 = seed;
 90   const uint32_t c1 = 0xcc9e2d51;
 91   const uint32_t c2 = 0x1b873593;
 93   //----------
 94   // body
 96   const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
 98   for(int i = -nblocks; i; i++)
 99   {
100     uint32_t k1 = blocks[i];
102     k1 *= c1;
103     k1 = rotl32(k1,15);
104     k1 *= c2;
106     h1 ^= k1;
107     h1 = rotl32(h1,13);
108     h1 = h1*5+0xe6546b64;
109   }
111   //----------
112   // tail
114   const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
116   uint32_t k1 = 0;
118   switch(len & 3)
119   {
120   case 3: k1 ^= tail[2] << 16;
121   case 2: k1 ^= tail[1] << 8;
122   case 1: k1 ^= tail[0];
123           k1 *= c1; k1 = rotl32(k1,15); k1 *= c2; h1 ^= k1;
124   };
126   //----------
127   // finalization
129   h1 ^= len;
131   h1 = fmix32(h1);
133   *(uint32_t*)out = h1;
134 }

 53 inline uint32_t rotl32 ( uint32_t x, int8_t r )
 54 {
 55   return (x << r) | (x >> (32 - r));
 56 }

 49 MURMURHASH_FORCE_INLINE uint32_t fmix32 (uint32_t h) {
 50   h ^= h >> 16;
 51   h *= 0x85ebca6b;
 52   h ^= h >> 13;
 53   h *= 0xc2b2ae35;
 54   h ^= h >> 16;
 55   return h;
 56 }


 36 enum ConsistentHashingLoadBalancerType {//类型
 37     CONS_HASH_LB_MURMUR3 = 0,//以此为例
 38     CONS_HASH_LB_MD5 = 1,
 39     CONS_HASH_LB_KETAMA = 2,
 41     // Identify the last one.    
 42     CONS_HASH_LB_LAST = 3        
 43 };
 45 class ConsistentHashingLoadBalancer : public LoadBalancer {
 46 public:
 47     struct Node {
 48         uint32_t hash; 
 49         ServerId server_sock;
 50         butil::EndPoint server_addr;  // To make sorting stable among all clients
 51         bool operator<(const Node &rhs) const {
 52             if (hash < rhs.hash) { return true; }
 53             if (hash > rhs.hash) { return false; }
 54             return server_addr < rhs.server_addr;
 55         }
 56         bool operator<(const uint32_t code) const {
 57             return hash < code;
 58         }
 59     };
 70 private:
 79     size_t _num_replicas;
 80     ConsistentHashingLoadBalancerType _type;
 81     butil::DoublyBufferedData<std::vector<Node> > _db_hash_ring;
 82 };


220 bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
221     std::vector<Node> add_nodes;
222     add_nodes.reserve(_num_replicas);//虚拟节点数
223     if (!GetReplicaPolicy(_type)->Build(server, _num_replicas, &add_nodes)) {
224         return false;
225     }
226     std::sort(add_nodes.begin(), add_nodes.end());
227     bool executed = false;
228     const size_t ret = _db_hash_ring.ModifyWithForeground(
229                         AddBatch, add_nodes, &executed);
230     CHECK(ret == 0 || ret == _num_replicas) << ret;
231     return ret != 0;
232 }

 64 bool DefaultReplicaPolicy::Build(ServerId server,
 65                                  size_t num_replicas,
 66                                  std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
 67     SocketUniquePtr ptr;
 68     if (Socket::AddressFailedAsWell(, &ptr) == -1) {
 69         return false;
 70     }
 71     replicas->clear();
 72     for (size_t i = 0; i < num_replicas; ++i) {
 73         char host[32];  
 74         int len = snprintf(host, sizeof(host), "%s-%lu",
 75                            endpoint2str(ptr->remote_side()).c_str(), i);
 76         ConsistentHashingLoadBalancer::Node node;
 77         node.hash = _hash_func(host, len);
 78         node.server_sock = server;
 79         node.server_addr = ptr->remote_side();
 80         replicas->push_back(node);
 81     }
 82     return true;
 83 }


152 size_t ConsistentHashingLoadBalancer::AddBatch(
153         std::vector<Node> &bg, const std::vector<Node> &fg,
154         const std::vector<Node> &servers, bool *executed) {
155     if (*executed) {
156         // Hack DBD
157         return fg.size() - bg.size();
158     }
159     *executed = true;
160     bg.resize(fg.size() + servers.size());
161     bg.resize(std::set_union(fg.begin(), fg.end(),
162                              servers.begin(), servers.end(), bg.begin())
163               - bg.begin());//去重
164     return bg.size() - fg.size();
165 }


257 bool ConsistentHashingLoadBalancer::RemoveServer(const ServerId& server) {
258     bool executed = false;
259     const size_t ret = _db_hash_ring.ModifyWithForeground(Remove, server, &executed);
260     CHECK(ret == 0 || ret == _num_replicas);
261     return ret != 0;
262 }

204 size_t ConsistentHashingLoadBalancer::Remove(
205         std::vector<Node> &bg, const std::vector<Node> &fg,
206         const ServerId& server, bool *executed) {
207     if (*executed) {
208         return bg.size() - fg.size();
209     }
210     *executed = true;
211     bg.clear();
212     for (size_t i = 0; i < fg.size(); ++i) {
213         if (fg[i].server_sock != server) {
214             bg.push_back(fg[i]);
215         }
216     }
217     return fg.size() - bg.size();
218 }



290 int ConsistentHashingLoadBalancer::SelectServer(
291     const SelectIn &in, SelectOut *out) {
292     if (!in.has_request_code) {
293         //带上hash值
294         return EINVAL;
295     }
296     if (in.request_code > UINT_MAX) {//request_code must be 32-bit currently
298         return EINVAL;
299     }
300     butil::DoublyBufferedData<std::vector<Node> >::ScopedPtr s;
301     if (_db_hash_ring.Read(&s) != 0) {
302         return ENOMEM;
303     }
304     if (s->empty()) {
305         return ENODATA;
306     }
307     std::vector<Node>::const_iterator choice =
308         std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code);
309     if (choice == s->end()) {
310         choice = s->begin();
311     }
312     for (size_t i = 0; i < s->size(); ++i) {
313         if (((i + 1) == s->size() // always take last chance
314              || !ExcludedServers::IsExcluded(in.excluded, choice->
315             && Socket::Address(choice->, out->ptr) == 0
316             && (*out->ptr)->IsAvailable()) {
317             return 0;
318         } else {
319             if (++choice == s->end()) {
320                 choice = s->begin();
321             }
322         }
323     }
324     return EHOSTDOWN;
325 }

murmurhash3 学习笔记

上一篇 下一篇

