brpc之bthread_id源码分析

2019-09-28  本文已影响0人  fooboo

在之前分析rpc时,发现在代码中使用到此结构,当时看到时候感觉挺复杂的,从名字上看还以为仅仅是个uint64_t的id:

189 typedef struct {
190     uint64_t value;
191 } bthread_id_t;

实际分配是通过Id结构,并make_id生成的:

325 static int id_create_impl(
326     bthread_id_t* id, void* data,
327     int (*on_error)(bthread_id_t, void*, int),
328     int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
329     IdResourceId slot;
330     Id* const meta = get_resource(&slot);
331     if (meta) {
332         meta->data = data;
333         meta->on_error = on_error;
334         meta->on_error2 = on_error2;
335         CHECK(meta->pending_q.empty());
336         uint32_t* butex = meta->butex;
337         if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
338             // Skip 0 so that bthread_id_t is never 0
339             // avoid overflow to make comparisons simpler.
340             *butex = 1;
341         }
342         *meta->join_butex = *butex;
343         meta->first_ver = *butex;
344         meta->locked_ver = *butex + 1;
345         *id = make_id(*butex, slot);
346         return 0;
347     }
348     return ENOMEM;
349 }

156 inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
157     const bthread_id_t tmp =
158         { (((uint64_t)slot.value) << 32) | (uint64_t)version };
159     return tmp;
160 }

以上分配一个Id用于同步rpc各个环节,且该id其实是索引一个Id资源对象。由资源池中的索引id和版本号组成。

111 struct BAIDU_CACHELINE_ALIGNMENT Id {
112     // first_ver ~ locked_ver - 1: unlocked versions
113     // locked_ver: locked
114     // unlockable_ver: locked and about to be destroyed
115     // contended_ver: locked and contended
116     uint32_t first_ver;
117     uint32_t locked_ver;
118     internal::FastPthreadMutex mutex;
119     void* data;
120     int (*on_error)(bthread_id_t, void*, int);
121     int (*on_error2)(bthread_id_t, void*, int, const std::string&);
122     const char *lock_location;
123     uint32_t* butex;
124     uint32_t* join_butex;
125     SmallQueue<PendingError, 2> pending_q; 
126         
127     Id() {
128         // Although value of the butex(as version part of bthread_id_t)
129         // does not matter, we set it to 0 to make program more deterministic.
130         butex = bthread::butex_create_checked<uint32_t>();
131         join_butex = bthread::butex_create_checked<uint32_t>();
132         *butex = 0;
133         *join_butex = 0;
134     }
141     inline bool has_version(uint32_t id_ver) const {
142         return id_ver >= first_ver && id_ver < locked_ver;
143     }
135     //more code...
150 };

has_version判断此时的版本号是否合法,因为一个rpc回来时,可能上下文已经没了,在bthread_id_cancel/bthread_id_unlock_and_destroy时重置。SmallQueue类是一个queue,当元素个数小于等于2时使用数组结构,多出来的时候使用堆结构,不贴相关代码:

 96     int _begin;
 97     int _size;
 98     T _c[N];
 99     std::deque<T>* _full;

回到上面,为什么id要这么生成,解决什么问题,这里引用相关说明:
“bthread_id是一个特殊的同步结构,它可以互斥RPC过程中的不同环节,也可以O(1)时间内找到RPC上下文(即Controller)。注意,这里我们谈论的是bthread_id_t,不是bthread_t(bthread的tid),这个名字起的确实不太好,容易混淆。

具体来说,bthread_id解决的问题有:

上文提到的bug在其他rpc框架中广泛存在,下面我们来看下brpc是如何通过bthread_id解决这些问题的。

bthread_id包括两部分,一个是用户可见的64位id,另一个是对应的不可见的bthread::Id结构体。用户接口都是操作id的。从id映射到结构体的方式和brpc中的其他结构类似:32位是内存池的偏移量,32位是version。前者O(1)时间定位,后者防止ABA问题。”

在发起rpc前,会通过call_id返回一个id:

398     const CallId correlation_id = cntl->call_id();
401     const int rc = bthread_id_lock_and_reset_range(
402                     correlation_id, NULL, 2 + cntl->max_retry());

1224 CallId Controller::call_id() {
1225     butil::atomic<uint64_t>* target =
1226         (butil::atomic<uint64_t>*)&_correlation_id.value;
1227     uint64_t loaded = target->load(butil::memory_order_relaxed);
1228     if (loaded) {
1229         const CallId id = { loaded };
1230         return id;           
1231     }
1232     // Optimistic locking.                                                   
1233     CallId cid = { 0 };
1234     // The range of this id will be reset in Channel::CallMethod
1235     CHECK_EQ(0, bthread_id_create2(&cid, this, HandleSocketFailed));
1236     if (!target->compare_exchange_strong(loaded, cid.value,
1237                                          butil::memory_order_relaxed)) {
1238         bthread_id_cancel(cid);
1239         cid.value = loaded;
1240     }   
1241     return cid;
1242 }

695 int bthread_id_create2(
696     bthread_id_t* id, void* data,
697     int (*on_error)(bthread_id_t, void*, int, const std::string&)) {
698     return bthread::id_create_impl(
699         id, data, NULL,
700         (on_error ? on_error : bthread::default_bthread_id_on_error2));
701 }

这里比如retry次数为3,那么这里bthread_id_lock_and_reset_range

405 int bthread_id_lock_and_reset_range_verbose(
406     bthread_id_t id, void **pdata, int range, const char *location) {
407     bthread::Id* const meta = address_resource(bthread::get_slot(id));
408     if (!meta) {
409         return EINVAL;
410     }
411     const uint32_t id_ver = bthread::get_version(id);
412     uint32_t* butex = meta->butex;
413     bool ever_contended = false;
414     meta->mutex.lock();
415     while (meta->has_version(id_ver)) {
416         if (*butex == meta->first_ver) {
417             // contended locker always wakes up the butex at unlock.
418             meta->lock_location = location;
419             if (range == 0) {
420                 // fast path
421             } else if (range < 0 ||
422                        range > bthread::ID_MAX_RANGE ||
423                        range + meta->first_ver <= meta->locked_ver) {
429             } else {
430                 meta->locked_ver = meta->first_ver + range;
431             }
432             *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
433             meta->mutex.unlock();
434             if (pdata) {
435                 *pdata = meta->data;
436             }
437             return 0;
438         } else if (*butex != meta->unlockable_ver()) {
439             *butex = meta->contended_ver();
440             uint32_t expected_ver = *butex;
441             meta->mutex.unlock();
442             ever_contended = true;
443             if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
444                 errno != EWOULDBLOCK && errno != EINTR) {
445                 return errno;
446             }
447             meta->mutex.lock();
448         } else { // bthread_id_about_to_destroy was called.
449             meta->mutex.unlock();
450             return EPERM;
451         }
452     }
453     meta->mutex.unlock();
454     return EINVAL;
455 }

167 inline uint32_t get_version(bthread_id_t id) {
168     return (uint32_t)(id.value & 0xFFFFFFFFul);
169 }

当rpc的响应回来时会进行bthread_id_lock,那么可能会执行if (*butex != meta->unlockable_ver()),执行*butex = meta->contended_ver(),因为后面别处unlock时可能会进行wake。接着尝试butex_wait

 974     // Make versioned correlation_id.
 975     // call_id         : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
 976     // call_id + 1     : first try.
 977     // call_id + 2     : retry 1
 978     // ...
 979     // call_id + N + 1 : retry N
 980     // All ids except call_id are versioned. Say if we've sent retry 1 and
 981     // a failed response of first try comes back, it will be ignored.
595     CallId current_id() const {
596         CallId id = { _correlation_id.value + _current_call.nretry + 1 };
597         return id;
598     }

并不会有竞争,此时*first_ver=1不变,*locked_ver=2-->*locked_ver=6*butex=2-->*butex=6;如果有竞争可能要butex_wait

接着发起正常的rpc请求(非retry的请求),此时的id在原来的基础上+1,但get_slot获取到的位移量还是同一个,这里假设这个请求超时,此时bthread_id_unlock,但并不需要wake:

568 int bthread_id_unlock(bthread_id_t id) {
570     bthread::Id* const meta = address_resource(bthread::get_slot(id));
571     if (!meta) {
572         return EINVAL;
573     }
574     uint32_t* butex = meta->butex;
575     // Release fence makes sure all changes made before signal visible to
576     // woken-up waiters.
577     const uint32_t id_ver = bthread::get_version(id);
578     meta->mutex.lock();
579     if (!meta->has_version(id_ver)) {
580         meta->mutex.unlock();
582         return EINVAL;
583     }
584     if (*butex == meta->first_ver) {
585         meta->mutex.unlock();
587         return EPERM;
588     }
589     bthread::PendingError front;
590     if (meta->pending_q.pop(&front)) {
592         meta->lock_location = front.location;
593         meta->mutex.unlock();
594         if (meta->on_error) {
595             return meta->on_error(front.id, meta->data, front.error_code);
596         } else {
597             return meta->on_error2(front.id, meta->data, front.error_code,
598                                    front.error_text);
599         }
600     } else {
602         const bool contended = (*butex == meta->contended_ver());
603         *butex = meta->first_ver;
604         meta->mutex.unlock();
605         if (contended) {
606             // We may wake up already-reused id, but that's OK.
607             bthread::butex_wake(butex);
608         }
609         return 0;
610     }
611 }

bthread_id_unlock中因pending_q无其他任务,故*butex = meta->first_ver最后*butex=1,如果之前有其他rpc进行wait则可能唤醒,否则pop一个任务并执行。

若这个正常的请求回来的时候,在baidu_std协议中ProcessRpcResponse执行bthread_id_lock(cid, (void**)&cntl)

563 int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
564                             const char *location) {
566     return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
567 }

如果此时发送请求的已经超时,此时回调HandleTimeout,并:

458 int bthread_id_error_verbose(bthread_id_t id, int error_code, 
459                              const char *location) {
460     return bthread_id_error2_verbose(id, error_code, std::string(), location);
461 }

714 int bthread_id_error2_verbose(bthread_id_t id, int error_code,
715                               const std::string& error_text,
716                               const char *location) {
717     bthread::Id* const meta = address_resource(bthread::get_slot(id));
718     if (!meta) {             
719         return EINVAL;
720     }
721     const uint32_t id_ver = bthread::get_version(id);
722     uint32_t* butex = meta->butex;
723     meta->mutex.lock();
724     if (!meta->has_version(id_ver)) {
725         meta->mutex.unlock();
726         return EINVAL;
727     }
728     if (*butex == meta->first_ver) {
729         *butex = meta->locked_ver;
730         meta->lock_location = location;
731         meta->mutex.unlock();
732         if (meta->on_error) {
733             return meta->on_error(id, meta->data, error_code);
734         } else {
735             return meta->on_error2(id, meta->data, error_code, error_text);
736         }
737     } else {
738         bthread::PendingError e;
739         e.id = id;
740         e.error_code = error_code;
741         e.error_text = error_text;
742         e.location = location;
743         meta->pending_q.push(e);
744         meta->mutex.unlock();
745         return 0;
746     }
747 }

若rpc上下文已经销毁等原因,直接返回EINVAL,否则若没有同一个rpc其他retry加锁则,加锁并执行错误处理函数,否则push一个PendingError由其他unlock的bthread执行。在执行某个rpc一些代码时,是需要阻止其他bthread执行同一个rpc的代码段。

当没有其他rpc互斥时,即*butex == meta->first_ver,加锁*butex = (ever_contended ? meta->contended_ver() : meta->locked_ver),这里要判断是否有contended,因为在后面解锁时const bool contended = (*butex == meta->contended_ver())可能要唤醒其他同步的rpc:

482     const bool contended = (*butex == meta->contended_ver());
483     *butex = meta->unlockable_ver();
484     meta->mutex.unlock();
485     if (contended) {
486         // wake up all waiting lockers.
487         bthread::butex_wake_except(butex, 0);
488     }

剩下其他接口:

516 int bthread_id_join(bthread_id_t id) {
517     const bthread::IdResourceId slot = bthread::get_slot(id);
518     bthread::Id* const meta = address_resource(slot);
519     if (!meta) {
520         // The id is not created yet, this join is definitely wrong.
521         return EINVAL;
522     }
523     const uint32_t id_ver = bthread::get_version(id);
524     uint32_t* join_butex = meta->join_butex;
525     while (1) {
526         meta->mutex.lock();
527         const bool has_ver = meta->has_version(id_ver);
528         const uint32_t expected_ver = *join_butex;
529         meta->mutex.unlock();
530         if (!has_ver) {
531             break;           
532         }
533         if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
534             errno != EWOULDBLOCK && errno != EINTR) {
535             return errno;
536         }
537     }
538     return 0;
539 }
492 int bthread_id_cancel(bthread_id_t id) {
493     bthread::Id* const meta = address_resource(bthread::get_slot(id));
494     if (!meta) {
495         return EINVAL;
496     }
497     uint32_t* butex = meta->butex; 
498     const uint32_t id_ver = bthread::get_version(id);
499     meta->mutex.lock();
500     if (!meta->has_version(id_ver)) {
501         meta->mutex.unlock();
502         return EINVAL;
503     }
504     if (*butex != meta->first_ver) {
505         meta->mutex.unlock();
506         return EPERM;
507     }       
508     *butex = meta->end_ver();
509     meta->first_ver = *butex;
510     meta->locked_ver = *butex;
511     meta->mutex.unlock();
512     return_resource(bthread::get_slot(id));
513     return 0;
514 }
616 int bthread_id_unlock_and_destroy(bthread_id_t id) {
617     bthread::Id* const meta = address_resource(bthread::get_slot(id));
618     if (!meta) { 
619         return EINVAL;
620     }   
622     uint32_t* butex = meta->butex;
623     uint32_t* join_butex = meta->join_butex;
624     const uint32_t id_ver = bthread::get_version(id);
625     meta->mutex.lock();
626     if (!meta->has_version(id_ver)) {
627         meta->mutex.unlock();
629         return EINVAL;
630     }   
631     if (*butex == meta->first_ver) {
632         meta->mutex.unlock();
634         return EPERM; 
635     }
636     const uint32_t next_ver = meta->end_ver();
637     *butex = next_ver;
638     *join_butex = next_ver;
639     meta->first_ver = next_ver;
640     meta->locked_ver = next_ver;
641     meta->pending_q.clear();
642     meta->mutex.unlock();
643     // Notice that butex_wake* returns # of woken-up, not successful or not.
644     bthread::butex_wake_except(butex, 0);
645     bthread::butex_wake_all(join_butex);
646     return_resource(bthread::get_slot(id));
647     return 0;
648 }

参考
bthread_id.md

上一篇下一篇

猜你喜欢

热点阅读