brpc之ResourcePool源码分析

2019-09-27  本文已影响0人  fooboo

该类在多个模块中使用到,是一种资源预分配的获取及回收,valueuint64_t一般作为index。从memory_management.md中可以看到相关的原理以及说明,虽然不建议使用,但这里只是分析一下他的实现方式以及可借鉴的思路,只能说有相应适用的业务场景。

基础数据类型:

 47 template <typename T>
 48 struct ResourceId {
 49     uint64_t value;
 50 
 51     operator uint64_t() const {
 52         return value;
 53     }
 55     //more code...
 60 };
 61 
 62 template <typename T, size_t NITEM>
 63 struct ResourcePoolFreeChunk {
 64     size_t nfree;
 65     ResourceId<T> ids[NITEM];
 66 };
 67 // for gcc 3.4.5
 68 template <typename T>
 69 struct ResourcePoolFreeChunk<T, 0> {
 70     size_t nfree;
 71     ResourceId<T> ids[0];
 72 };

元素个数和大小:


105     static const size_t BLOCK_NITEM = ResourcePoolBlockItemNum<T>::value;
106     static const size_t FREE_CHUNK_NITEM = BLOCK_NITEM;
107 
108     // Free identifiers are batched in a FreeChunk before they're added to
109     // global list(_free_chunks).
110     typedef ResourcePoolFreeChunk<T, FREE_CHUNK_NITEM>      FreeChunk;
111     typedef ResourcePoolFreeChunk<T, 0> DynamicFreeChunk;

 92 template <typename T>
 93 class ResourcePoolBlockItemNum {
 94     static const size_t N1 = ResourcePoolBlockMaxSize<T>::value / sizeof(T);
 95     static const size_t N2 = (N1 < 1 ? 1 : N1);
 96 public:
 97     static const size_t value = (N2 > ResourcePoolBlockMaxItem<T>::value ?
 98                                  ResourcePoolBlockMaxItem<T>::value : N2);
 99 };

 64 template <typename T> struct ResourcePoolBlockMaxSize {
 65     static const size_t value = 64 * 1024; // bytes
 66 };
 67 template <typename T> struct ResourcePoolBlockMaxItem {
 68     static const size_t value = 256;
 69 };

从使用他的地方开始分析,常用的两个接口:

 97 template <typename T> inline T* get_resource(ResourceId<T>* id) {
 98     return ResourcePool<T>::singleton()->get_resource(id);
 99 }

118 template <typename T> inline int return_resource(ResourceId<T> id) {
119     return ResourcePool<T>::singleton()->return_resource(id);
120 }
102 template <typename T>
103 class BAIDU_CACHELINE_ALIGNMENT ResourcePool {
104 public:
361     static inline ResourcePool* singleton() {
362         ResourcePool* p = _singleton.load(butil::memory_order_consume);
363         if (p) {
364             return p;
365         }
366         pthread_mutex_lock(&_singleton_mutex);
367         p = _singleton.load(butil::memory_order_consume);
368         if (!p) {
369             p = new ResourcePool();
370             _singleton.store(p, butil::memory_order_release);
371         }
372         pthread_mutex_unlock(&_singleton_mutex);
373         return p;
374     }
376 private:
377     ResourcePool() {
378         _free_chunks.reserve(RP_INITIAL_FREE_LIST_SIZE);
379         pthread_mutex_init(&_free_chunks_mutex, NULL);
380     }

545     static BAIDU_THREAD_LOCAL LocalPool* _local_pool;
543     static butil::static_atomic<ResourcePool*> _singleton;
544     static pthread_mutex_t _singleton_mutex;
546     static butil::static_atomic<long> _nlocal;//统计在使用localpool的bthread/pthread
552     std::vector<DynamicFreeChunk*> _free_chunks;

以上是多线程下线程安全的singleton实现,除去pthread_once实现,其他的实现方式都是有问题的,当然有问题的实现在c++11之前是这样的。关于具体有什么问题,以及pthread_once为为什么可行等原因可以参考理解Double-Checked Locking和我写的多线程编程一些注意点

以上的构造函数非常简单,即预分配一定的空间及初始化用于保存_free_chunks的锁_free_chunks_mutex_local_pool为TLS。

565 template <typename T>
566 BAIDU_THREAD_LOCAL typename ResourcePool<T>::LocalPool* ResourcePool<T>::_local_pool = NULL;

若调用get_resource申请一个资源时:

281     inline T* get_resource(ResourceId<T>* id) {
282         LocalPool* lp = get_or_new_local_pool();
283         if (__builtin_expect(lp != NULL, 1)) {
284             return lp->get(id);
285         }
286         return NULL;
287     }

439     inline LocalPool* get_or_new_local_pool() {
440         LocalPool* lp = _local_pool;
441         if (lp != NULL) {
442             return lp;
443         }
444         lp = new(std::nothrow) LocalPool(this);
445         if (NULL == lp) {
446             return NULL;
447         }
448         BAIDU_SCOPED_LOCK(_change_thread_mutex); //avoid race with clear()
449         _local_pool = lp;
450         butil::thread_atexit(LocalPool::delete_local_pool, lp);
451         _nlocal.fetch_add(1, butil::memory_order_relaxed);
452         return lp;
453     }

一开始为NULL故会构造一个:

139     // Each thread has an instance of this class.
140     class BAIDU_CACHELINE_ALIGNMENT LocalPool {
141     public:
142         explicit LocalPool(ResourcePool* pool)
143             : _pool(pool)
144             , _cur_block(NULL)
145             , _cur_block_index(0) {
146             _cur_free.nfree = 0;
147         }
148         
149         ~LocalPool() {
150             // Add to global _free_chunks if there're some free resources
151             if (_cur_free.nfree) {
152                 _pool->push_free_chunk(_cur_free);//归还资源
153             }
154         
155             _pool->clear_from_destructor_of_local_pool();
156         }
157 
158         static void delete_local_pool(void* arg) {
159             delete(LocalPool*)arg;
160         }

210         inline T* get(ResourceId<T>* id) {
211             BAIDU_RESOURCE_POOL_GET();
212         }
244     private:
245         ResourcePool* _pool;
246         Block* _cur_block;
247         size_t _cur_block_index;
248         FreeChunk _cur_free;
249     };

116     struct BAIDU_CACHELINE_ALIGNMENT Block {
117         char items[sizeof(T) * BLOCK_NITEM];
118         size_t nitem;
119 
120         Block() : nitem(0) {}
121     };

BAIDU_RESOURCE_POOL_GET宏展开:

166 #define BAIDU_RESOURCE_POOL_GET(CTOR_ARGS)                              \
167         /* Fetch local free id */                                       \
168         if (_cur_free.nfree) {                                          \
169             const ResourceId<T> free_id = _cur_free.ids[--_cur_free.nfree]; \
170             *id = free_id;                                              \
172             return unsafe_address_resource(free_id);                    \
173         }                                                               \
174         /* Fetch a FreeChunk from global.                               \
175            TODO: Popping from _free needs to copy a FreeChunk which is  \
176            costly, but hardly impacts amortized performance. */         \
177         if (_pool->pop_free_chunk(_cur_free)) {                         \
178             //more code...  
183         }                                                               \
184         /* Fetch memory from local block */                             \
185         if (_cur_block && _cur_block->nitem < BLOCK_NITEM) {            \
186             //more code...                                        \
194         }                                                               \
195         /* Fetch a Block from global */                                 \
196         _cur_block = add_block(&_cur_block_index);                      \
197         if (_cur_block != NULL) {                                       \
198             //more code...
206         }                                                               \
207         return NULL;                                                    \

首先尝试从本地_cur_free中获取一个资源,一开始还未有资源故不成立;接着尝试从全局pop个可用的FreeChunk

509     bool pop_free_chunk(FreeChunk& c) {
510         // Critical for the case that most return_object are called in
511         // different threads of get_object.
512         if (_free_chunks.empty()) {
513             return false;
514         }
515         pthread_mutex_lock(&_free_chunks_mutex);
516         if (_free_chunks.empty()) {
517             pthread_mutex_unlock(&_free_chunks_mutex);
518             return false;
519         }
520         DynamicFreeChunk* p = _free_chunks.back();
521         _free_chunks.pop_back();
522         pthread_mutex_unlock(&_free_chunks_mutex);
523         c.nfree = p->nfree;
524         memcpy(c.ids, p->ids, sizeof(*p->ids) * p->nfree);
525         free(p);
526         return true;
527     }

这里也是失败的;接着尝试从本地block中分配,这里一开始也是NULL,所以:

195         /* Fetch a Block from global */                                 \
196         _cur_block = add_block(&_cur_block_index);                      \
197         if (_cur_block != NULL) {                                       \
198             id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
199             T* p = new ((T*)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
200             if (!ResourcePoolValidator<T>::validate(p)) {               \
201                 p->~T();                                                \
202                 return NULL;                                            \
203             }                                                           \
204             ++_cur_block->nitem;                                        \
205             return p;                                                   \
206         }                                                               \
207         return NULL;                                                    \

386     // Create a Block and append it to right-most BlockGroup.
387     static Block* add_block(size_t* index) {
388         Block* const new_block = new(std::nothrow) Block;
389         if (NULL == new_block) {
390             return NULL;
391         }
392 
393         size_t ngroup;
394         do {
395             ngroup = _ngroup.load(butil::memory_order_acquire);
396             if (ngroup >= 1) {
397                 BlockGroup* const g =
398                     _block_groups[ngroup - 1].load(butil::memory_order_consume);
399                 const size_t block_index =
400                     g->nblock.fetch_add(1, butil::memory_order_relaxed);
401                 if (block_index < RP_GROUP_NBLOCK) {
402                     g->blocks[block_index].store(
403                         new_block, butil::memory_order_release);
404                     *index = (ngroup - 1) * RP_GROUP_NBLOCK + block_index;
405                     return new_block;
406                 } 
407                 g->nblock.fetch_sub(1, butil::memory_order_relaxed);
408             }
409         } while (add_block_group(ngroup));
410 
411         // Fail to add_block_group.
412         delete new_block;
413         return NULL;
414     }

416     // Create a BlockGroup and append it to _block_groups.
417     // Shall be called infrequently because a BlockGroup is pretty big.
418     static bool add_block_group(size_t old_ngroup) {
419         BlockGroup* bg = NULL;
420         BAIDU_SCOPED_LOCK(_block_group_mutex);
421         const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
422         if (ngroup != old_ngroup) {
423             // Other thread got lock and added group before this thread.
424             return true;
425         }
426         if (ngroup < RP_MAX_BLOCK_NGROUP) {
427             bg = new(std::nothrow) BlockGroup;
428             if (NULL != bg) {
429                 // Release fence is paired with consume fence in address() and
430                 // add_block() to avoid un-constructed bg to be seen by other
431                 // threads.
432                 _block_groups[ngroup].store(bg, butil::memory_order_release);
433                 _ngroup.store(ngroup + 1, butil::memory_order_release);
434             }
435         }
436         return bg != NULL;
437     }

 87 static const size_t RP_MAX_BLOCK_NGROUP = 65536;
 88 static const size_t RP_GROUP_NBLOCK_NBIT = 16;
 89 static const size_t RP_GROUP_NBLOCK = (1UL << RP_GROUP_NBLOCK_NBIT);
 90 static const size_t RP_INITIAL_FREE_LIST_SIZE = 1024;

123     // A Resource addresses at most RP_MAX_BLOCK_NGROUP BlockGroups,
124     // each BlockGroup addresses at most RP_GROUP_NBLOCK blocks. So a
125     // resource addresses at most RP_MAX_BLOCK_NGROUP * RP_GROUP_NBLOCK Blocks.
126     struct BlockGroup {
127         butil::atomic<size_t> nblock;
128         butil::atomic<Block*> blocks[RP_GROUP_NBLOCK];
129         
130         BlockGroup() : nblock(0) {
131             // We fetch_add nblock in add_block() before setting the entry,
132             // thus address_resource() may sees the unset entry. Initialize
133             // all entries to NULL makes such address_resource() return NULL.
134             memset(blocks, 0, sizeof(butil::atomic<Block*>) * RP_GROUP_NBLOCK);
135         }       
136     }; 

这里add_blocknew一个Block,当block_index < RP_GROUP_NBLOCK并自增block_index,把new_block放到blocks[block_index],并设置index和返回new_block,否则要new一个新的BlockGroup;每次new一个Block时的索引范围index(ngroup - 1) * RP_GROUP_NBLOCK + block_index,故通过此index的生成方式,就能解析出他属于哪个group和哪个blocks的索引。

接着设置id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem并构造T* p = new ((T*)_cur_block->items + _cur_block->nitem) T CTOR_ARGS,下次再申请资源时,因为无法从_free_chunkspopFreeChunk,所以还是走_cur_block,满足:

185         if (_cur_block && _cur_block->nitem < BLOCK_NITEM) {            \
186             id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
187             T* p = new ((T*)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
188             if (!ResourcePoolValidator<T>::validate(p)) {               \
189                 p->~T();                                                \
190                 return NULL;                                            \
191             }                                                           \
192             ++_cur_block->nitem;                                        \
193             return p;                                                   \
194         }    

若调用return_resource释放一个资源时:

307     inline int return_resource(ResourceId<T> id) {
308         LocalPool* lp = get_or_new_local_pool();
309         if (__builtin_expect(lp != NULL, 1)) {
310             return lp->return_resource(id);
311         }
312         return -1;
313     }

140     class BAIDU_CACHELINE_ALIGNMENT LocalPool {
141     public:
226         inline int return_resource(ResourceId<T> id) {
227             // Return to local free list
228             if (_cur_free.nfree < ResourcePool::free_chunk_nitem()) {
229                 _cur_free.ids[_cur_free.nfree++] = id;
231                 return 0;
232             }
233             // Local free list is full, return it to global.
234             // For copying issue, check comment in upper get()
235             if (_pool->push_free_chunk(_cur_free)) {
236                 _cur_free.nfree = 1;
237                 _cur_free.ids[0] = id;
239                 return 0;
240             }
241             return -1;
242         }

释放资源时,直接把id缓存起来_cur_free.ids[_cur_free.nfree++] = id,如果达到一定的数时,则:

529     bool push_free_chunk(const FreeChunk& c) {
530         DynamicFreeChunk* p = (DynamicFreeChunk*)malloc(
531             offsetof(DynamicFreeChunk, ids) + sizeof(*c.ids) * c.nfree);
532         if (!p) {
533             return false;
534         }   
535         p->nfree = c.nfree;
536         memcpy(p->ids, c.ids, sizeof(*c.ids) * c.nfree);
537         pthread_mutex_lock(&_free_chunks_mutex);
538         _free_chunks.push_back(p); 
539         pthread_mutex_unlock(&_free_chunks_mutex);
540         return true;
541     }

申请一块新的内存,把ids拷贝到p中,并压入_free_chunks,那么后期再get_resource时:

177         if (_pool->pop_free_chunk(_cur_free)) {                         \
178             --_cur_free.nfree;                                          \
179             const ResourceId<T> free_id =  _cur_free.ids[_cur_free.nfree]; \
180             *id = free_id;                                              \
181             BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1;                   \
182             return unsafe_address_resource(free_id);                    \
183         }

直接pop成功并使用。

LocalPool析构时会归还资源,并clear_from_destructor_of_local_pool

455     void clear_from_destructor_of_local_pool() {
456         // Remove tls
457         _local_pool = NULL;
458 
459         if (_nlocal.fetch_sub(1, butil::memory_order_relaxed) != 1) {
460             return;//有其他bthread/pthread在使用不destructor资源
461         }//否则全部释放

468 #ifdef BAIDU_CLEAR_RESOURCE_POOL_AFTER_ALL_THREADS_QUIT
469         BAIDU_SCOPED_LOCK(_change_thread_mutex);  // including acquire fence.
470         // Do nothing if there're active threads.
471         if (_nlocal.load(butil::memory_order_relaxed) != 0) {
472             return;
473         }
474         // All threads exited and we're holding _change_thread_mutex to avoid
475         // racing with new threads calling get_resource().
476 
477         // Clear global free list.
478         FreeChunk dummy;
479         while (pop_free_chunk(dummy));
480 
481         // Delete all memory
482         const size_t ngroup = _ngroup.exchange(0, butil::memory_order_relaxed);
483         for (size_t i = 0; i < ngroup; ++i) {
484             BlockGroup* bg = _block_groups[i].load(butil::memory_order_relaxed);
485             if (NULL == bg) {
486                 break;
487             }
488             size_t nblock = std::min(bg->nblock.load(butil::memory_order_relaxed),
489                                      RP_GROUP_NBLOCK);
490             for (size_t j = 0; j < nblock; ++j) {
491                 Block* b = bg->blocks[j].load(butil::memory_order_relaxed);
492                 if (NULL == b) {
493                     continue;
494                 }
495                 for (size_t k = 0; k < b->nitem; ++k) {
496                     T* const objs = (T*)b->items;
497                     objs[k].~T();
498                 }
499                 delete b;
500             }
501             delete bg;
502         }
503 
504         memset(_block_groups, 0, sizeof(BlockGroup*) * RP_MAX_BLOCK_NGROUP);
505 #endif
506     }

还有两个接口,当根据id通过get_resource返回资源时,和通过id定位address_resource资源时:

251     static inline T* unsafe_address_resource(ResourceId<T> id) {
252         const size_t block_index = id.value / BLOCK_NITEM;
253         return (T*)(_block_groups[(block_index >> RP_GROUP_NBLOCK_NBIT)]
254                     .load(butil::memory_order_consume)
255                     ->blocks[(block_index & (RP_GROUP_NBLOCK - 1))]
256                     .load(butil::memory_order_consume)->items) +
257                id.value - block_index * BLOCK_NITEM;
258     }
259 
260     static inline T* address_resource(ResourceId<T> id) {
261         const size_t block_index = id.value / BLOCK_NITEM;
262         const size_t group_index = (block_index >> RP_GROUP_NBLOCK_NBIT);
263         if (__builtin_expect(group_index < RP_MAX_BLOCK_NGROUP, 1)) {
264             BlockGroup* bg =
265                 _block_groups[group_index].load(butil::memory_order_consume);
266             if (__builtin_expect(bg != NULL, 1)) {
267                 Block* b = bg->blocks[block_index & (RP_GROUP_NBLOCK - 1)]
268                            .load(butil::memory_order_consume);
269                 if (__builtin_expect(b != NULL, 1)) {
270                     const size_t offset = id.value - block_index * BLOCK_NITEM;
271                     if (__builtin_expect(offset < b->nitem, 1)) {
272                         return (T*)b->items + offset;
273                     }
274                 }
275             }
276         }
277 
278         return NULL;
279     }

其中block_index >> RP_GROUP_NBLOCK_NBIT反向定位出在哪个groupblock_index & (RP_GROUP_NBLOCK - 1)定位哪个block,最后哪个indexid.value - block_index * BLOCK_NITEMunsafe_address_resource返回可能为NULL,而address_resource保守的多,本质上两个函数的功能是一样的。

总结一下,新的对象会构造,老的不需要,使用的时候需要重置,释放时也不会析构,当然只是特殊的情况。

ResourcePool

创建一个类型为T的对象并返回一个偏移量,这个偏移量可以在O(1)时间内转换为对象指针。这个偏移量相当于指针,但它的值在一般情况下小于2^32,所以我们可以把它作为64位id的一部分。对象可以被归还,但归还后对象并没有删除,也没有被析构,而是仅仅进入freelist。下次申请时可能会取到这种使用过的对象,需要重置后才能使用。当对象被归还后,通过对应的偏移量仍可以访问到对象,即ResourcePool只负责内存分配,并不解决ABA问题。但对于越界的偏移量,ResourcePool会返回空。

由于对象等长,ResourcePool通过批量分配和归还内存以避免全局竞争,并降低单次的开销。每个线程的分配流程如下:

原理是比较简单的。工程实现上数据结构、原子变量、memory fence等问题会复杂一些。下面以bthread_t的生成过程说明ResourcePool是如何被应用的。

bthread_t

其实分析完后,有几个疑问,这是在分析butex_destroy时看到上面的注释时产生的。因为相关实现包括get_object,并没有在归还资源时把该内存对象进行析构,那么下次使用时,若不小心忘记某个字段重置使用到脏数据,那么就会有问题。

113 // Return the object associated with identifier |id| back. The object is NOT
114 // destructed and will be returned by later get_resource<T>. Similar with
115 // free/delete, validity of the id is not checked, user shall not return a
116 // not-yet-allocated or already-returned id otherwise behavior is undefined.
117 // Returns 0 when successful, -1 otherwise.
118 template <typename T> inline int return_resource(ResourceId<T> id) {
119     return ResourcePool<T>::singleton()->return_resource(id);
120 }
121 
122 // Get the object associated with the identifier |id|.
123 // Returns NULL if |id| was not allocated by get_resource<T> or
124 // ResourcePool<T>::get_resource() of a variant before.
125 // Addressing a free(returned to pool) identifier does not return NULL.
126 // NOTE: Calling this function before any other get_resource<T>/
127 //       return_resource<T>/address<T>, even if the identifier is valid,
128 //       may race with another thread calling clear_resources<T>.
129 template <typename T> inline T* address_resource(ResourceId<T> id) {
130     return ResourcePool<T>::address_resource(id);
131 }

比如在:

325 static int id_create_impl(
326     bthread_id_t* id, void* data,
327     int (*on_error)(bthread_id_t, void*, int),
328     int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
329     IdResourceId slot;
330     Id* const meta = get_resource(&slot);
331     if (meta) {
332         meta->data = data;
333         meta->on_error = on_error;
334         meta->on_error2 = on_error2;
335         CHECK(meta->pending_q.empty());
336         uint32_t* butex = meta->butex;
337         if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
338             // Skip 0 so that bthread_id_t is never 0
339             // avoid overflow to make comparisons simpler.
340             *butex = 1;
341         }
342         *meta->join_butex = *butex;
343         meta->first_ver = *butex;
344         meta->locked_ver = *butex + 1;
345         *id = make_id(*butex, slot);
346         return 0;
347     }
348     return ENOMEM;
349 }

127     Id() {
128         // Although value of the butex(as version part of bthread_id_t)
129         // does not matter, we set it to 0 to make program more deterministic.
130         butex = bthread::butex_create_checked<uint32_t>();
131         join_butex = bthread::butex_create_checked<uint32_t>();
132         *butex = 0;
133         *join_butex = 0;
134     }

243 void* butex_create() {
244     Butex* b = butil::get_object<Butex>();
245     if (b) {
246         return &b->value;
247     }
248     return NULL;
249 }

116 struct BAIDU_CACHELINE_ALIGNMENT Butex {
117     Butex() {} 
118     ~Butex() {}
119         
120     butil::atomic<int> value;
121     ButexWaiterList waiters;
122     internal::FastPthreadMutex waiter_lock;
123 };

如果上面get_resource时,该get_slot(id)是第一次使用则会构造并初始化,最后使用后return_resource并不析构,而下一次get_resource时并不会再构造,原来的Butex对象中相关的数据成员一定要在上一次return_resource根据情况重置?

虽然在相关的实现中都释放并清空,比如:

636     const uint32_t next_ver = meta->end_ver();
637     *butex = next_ver;
638     *join_butex = next_ver;
639     meta->first_ver = next_ver;
640     meta->locked_ver = next_ver;
641     meta->pending_q.clear();
642     meta->mutex.unlock();
643     // Notice that butex_wake* returns # of woken-up, not successful or not.
644     bthread::butex_wake_except(butex, 0);
645     bthread::butex_wake_all(join_butex);
646     return_resource(bthread::get_slot(id));

截取部分注释:

186 // Use ObjectPool(which never frees memory) to solve the race between
187 // butex_wake() and butex_destroy()

238 //Another method is never freeing butex, with the side effect 
239 // that butex_wake() may wake up an unrelated butex(the one reuses the memory)
240 // and cause spurious wakeups. According to our observations, the race is 
241 // infrequent, even rare. The extra spurious wakeups should be acceptable.

我想,可能还要再结合后面其他模块加深分析。

上一篇 下一篇

猜你喜欢

热点阅读