396 if (!server_accessor.AddConcurrency(cntl.get())) {
397 cntl->SetFailed(
398 ELIMIT, "Reached server's max_concurrency=%d",
399 server->options().max_concurrency);
400 break;
401 }
403 if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
404 cntl->SetFailed(ELIMIT, "Too many user code to run when"
405 " -usercode_in_pthread is on");
406 break;
407 }
43 // Returns true if the `max_concurrency' limit is not reached.
44 bool AddConcurrency(Controller* c) {
45 if (_server->options().max_concurrency <= 0) {
46 return true;
47 }
48 c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
49 return (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
50 <= _server->options().max_concurrency);
51 }
440 method_status = mp->status;
441 if (method_status) {
442 int rejected_cc = 0;
443 if (!method_status->OnRequested(&rejected_cc)) {
444 cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
445 mp->method->full_name().c_str(), rejected_cc);
446 break;
447 }
448 }
93 inline bool MethodStatus::OnRequested(int* rejected_cc) {
94 const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
95 if (NULL == _cl || _cl->OnRequested(cc)) {
96 return true;
97 }
98 if (rejected_cc) {
99 *rejected_cc = cc;
100 }
101 return false;
102 }
90 bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
91 return current_concurrency <= _max_concurrency;
92 }
152 ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
153 ConcurrencyRemover::~ConcurrencyRemover() {
154 if (_status) {
155 _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
156 _status = NULL;
157 }
158 ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
159 }
104 inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
105 _nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
106 if (0 == error_code) {
107 _latency_rec << latency;
108 } else {
109 _nerror_bvar << 1;
110 }
111 if (NULL != _cl) {
112 _cl->OnResponded(error_code, latency);
113 }
114 }
53 void RemoveConcurrency(const Controller* c) {
54 if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
55 butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
56 }
57 }
以上实现简单的判断是否超过并发数限制,而上面并没有给出如何达到限制数,只是简单的判断current_concurrency <= _max_concurrency。
32 // This method should be called each time a request comes in. It returns
33 // false when the concurrency reaches the upper limit, otherwise it
34 // returns true. Normally, when OnRequested returns false, you should
35 // return an ELIMIT error directly.
36 virtual bool OnRequested(int current_concurrency) = 0;
38 // Each request should call this method before responding.
39 // `error_code' : Error code obtained from the controller, 0 means success.
40 // `latency' : Microseconds taken by RPC.
41 // NOTE: Even if OnRequested returns false, after sending ELIMIT, you
42 // still need to call OnResponded.
43 virtual void OnResponded(int error_code, int64_t latency_us) = 0;
94 void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
95 if (0 == error_code) {
96 _total_succ_req.fetch_add(1, butil::memory_order_relaxed);
97 } else if (ELIMIT == error_code) {
98 return;
99 }
101 const int64_t now_time_us = butil::gettimeofday_us();
102 int64_t last_sampling_time_us =
103 _last_sampling_time_us.load(butil::memory_order_relaxed);
105 if (last_sampling_time_us == 0 ||
106 now_time_us - last_sampling_time_us >=
107 FLAGS_auto_cl_sampling_interval_ms * 1000) {
108 bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
109 last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
110 if (sample_this_call) {
111 bool sample_window_submitted = AddSample(error_code, latency_us,
112 now_time_us);
113 if (sample_window_submitted) {
114 //more code...
123 }
124 }
125 }
138 bool AutoConcurrencyLimiter::AddSample(int error_code,
139 int64_t latency_us,
140 int64_t sampling_time_us) {
141 std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
142 if (_reset_latency_us != 0) {
143 // min_latency is about to be reset soon.
144 if (_reset_latency_us > sampling_time_us) {
145 // ignoring samples during waiting for the deadline.
146 return false;
147 }
148 // Remeasure min_latency when concurrency has dropped to low load
149 _min_latency_us = -1;
150 _reset_latency_us = 0;
151 _remeasure_start_us = NextResetTime(sampling_time_us);
152 ResetSampleWindow(sampling_time_us);
153 }
155 if (_sw.start_time_us == 0) {
156 _sw.start_time_us = sampling_time_us;
157 }
159 if (error_code != 0 && FLAGS_auto_cl_enable_error_punish) {
160 ++_sw.failed_count;
161 _sw.total_failed_us += latency_us;
162 } else if (error_code == 0) {
163 ++_sw.succ_count;
164 _sw.total_succ_us += latency_us;
165 }
167 if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
168 if (sampling_time_us - _sw.start_time_us >=
169 FLAGS_auto_cl_sample_window_size_ms * 1000) {
170 // If the sample size is insufficient at the end of the sampling
171 // window, discard the entire sampling window
172 ResetSampleWindow(sampling_time_us);
173 }
174 return false;
175 }
176 if (sampling_time_us - _sw.start_time_us <
177 FLAGS_auto_cl_sample_window_size_ms * 1000 &&
178 _sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
179 return false;
180 }
182 if(_sw.succ_count > 0) {
183 UpdateMaxConcurrency(sampling_time_us);
184 } else {
185 // All request failed
186 _max_concurrency /= 2;
187 }
188 ResetSampleWindow(sampling_time_us);
189 return true;
190 }
219 void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
220 int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
221 double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
222 int64_t avg_latency =
223 std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
224 double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
225 UpdateMinLatency(avg_latency);
226 UpdateQps(qps);
228 int next_max_concurrency = 0;
229 // Remeasure min_latency at regular intervals
230 if (_remeasure_start_us <= sampling_time_us) {
231 const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
232 _reset_latency_us = sampling_time_us + avg_latency * 2;
233 next_max_concurrency =
234 std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
235 } else {
236 const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
237 const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
238 const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
239 const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
240 if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241 qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242 _explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
243 } else {
244 _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245 }
246 next_max_concurrency =
247 _min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
248 }
250 if (next_max_concurrency != _max_concurrency) {
251 _max_concurrency = next_max_concurrency;
252 }
253 }
201 void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
202 const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema;
203 if (_min_latency_us <= 0) {
204 _min_latency_us = latency_us;
205 } else if (latency_us < _min_latency_us) {
206 _min_latency_us = latency_us * ema_factor + _min_latency_us * (1 - ema_factor);
207 }
208 }
210 void AutoConcurrencyLimiter::UpdateQps(double qps) {
211 const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
212 if (qps >= _ema_max_qps) {
213 _ema_max_qps = qps;
214 } else {
215 _ema_max_qps = qps * ema_factor + _ema_max_qps * (1 - ema_factor);
216 }
217 }
2计算公式方面,当current_qps > 保存的max_qps时,直接进行更新,不进行平滑处理
233 next_max_concurrency = std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
240 if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241 qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242 _explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
243 } else {
244 _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245 }
246 next_max_concurrency =
247 _min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
max_concurrency = max_qps * ((2+alpha) * min_latency - latency)