线程通讯单读单写消息队列内存高性能分配策略
2023-09-24 本文已影响0人
谭英智
背景
在程序运行的过程中,经常会遇到使用std::queue的情况,例如线程之间的交互
而std::queue内的元素,一般会使用std::unique_ptr来维护内存的生命周期
通常在不优化内存的情况下,std::queue内的指针元素会发生很严重的碎片化
导致在入队列和出队列访问元素内存时发生非常多的CPU Cache不命中,使得程序性能下降
需要解决的问题
解决std::queue元素内存碎片化而导致的性能问题
解决思路
-
申请相邻元素的地址越邻近越好
-
释放的内存可以被循环再用
-
生产者和消费者线程在申请内存和释放内存时尽量高效,避免同步和互斥
设计
申请内存
不需要任何锁和原子同步
-
如果可用buf不足分配的size,则向系统申请4k连续内存
-
把4k buf连续分配下去,只到不足分配size,则再次向系统申请4k内存
释放内存
不需要任何锁和原子同步
-
记录释放内存的起始地址
-
如果起始地址记录的内存满4k,则释放内存,并重新记录起始地址
内存复用
使用tcmalloc库
性能
对比直接使用new来申请内存的方法
提升了3.5x的性能
实现
#include <memory>
#include <queue>
#include <list>
#include <memory_resource>
struct FifoMemoryAlloc
{
~FifoMemoryAlloc() {
if(available_memory != nullptr) {
free(available_memory);
}
}
void *allocate(size_t num_to_allocate)
{
if(num_to_allocate>to_allocate) {
return malloc(num_to_allocate);
}
if(available_memory == nullptr) {
available_memory = (uint8_t*)malloc(to_allocate);
cur_available_idx = 0;
}
if(cur_available_idx + num_to_allocate > to_allocate) {
available_memory = (uint8_t*)malloc(to_allocate);
cur_available_idx = 0;
}
auto ret = &available_memory[cur_available_idx];
cur_available_idx += num_to_allocate;
return ret;
}
void deallocate(void *ptr, size_t num_to_free)
{
if(num_to_free>to_allocate) {
free(ptr);
return;
}
if(start_release_memory == nullptr) {
start_release_memory = ptr;
}
if(cur_release_idx + num_to_free <= to_allocate) {
cur_release_idx += num_to_free;
if(cur_release_idx == to_allocate) {
if(available_memory == start_release_memory) {
available_memory = nullptr;
}
free(start_release_memory);
start_release_memory = nullptr;
cur_release_idx = 0;
}
} else {
free(start_release_memory);
start_release_memory = ptr;
cur_release_idx = num_to_free;
}
}
private:
uint8_t* available_memory = nullptr;
int cur_available_idx = 0;
size_t to_allocate = 4096;
void* start_release_memory = nullptr;
int cur_release_idx = 0;
};
int loop_size = 1000;
int num_size = 100;
static void FifoMemoryAllocBenchMark(benchmark::State &state)
{
FifoMemoryAlloc alloc;
std::queue<int *> q;
for (auto _ : state)
{
for (int m = 0; m < loop_size; ++m)
{
for (int i = 0; i < num_size; ++i)
{
int* t = (int*)alloc.allocate(sizeof(int));
*t = i;
q.push(t);
}
while (!q.empty())
{
auto t = q.front();
q.pop();
benchmark::DoNotOptimize(*t = 1);
alloc.deallocate(t, sizeof(int));
}
}
}
}
// Register the function as a benchmark
BENCHMARK(FifoMemoryAllocBenchMark);
static void SystemMemoryBenchMark(benchmark::State &state)
{
std::queue<int *> q;
for (auto _ : state)
{
for (int m = 0; m < loop_size; ++m)
{
for (int i = 0; i < num_size; ++i)
{
int* t = (int*)malloc(sizeof(int));
*t = i;
q.push(t);
}
while (!q.empty())
{
auto t = q.front();
q.pop();
benchmark::DoNotOptimize(*t = 1);
free(t);
}
}
}
}
BENCHMARK(SystemMemoryBenchMark);