PyTorch阅读笔记

PyTorch-CUDA端显存池函数解读

2021-02-15  本文已影响0人  CPinging

https://www.52coding.com.cn/2019/05/05/PyTorch2/中提到了THCCachingAllocator文件,然而我找了很久在最新版本的PyTorch代码中均没有找到,应该是做了版本迭代。

新的版本统一把代码放到了CudaCachingAllocator中。如下图:

其中CudaCachingAllocator.h头文件代码如下:

#ifndef THC_DEVICE_ALLOCATOR_INC
#define THC_DEVICE_ALLOCATOR_INC

#include <c10/cuda/CUDAStream.h>
#include <c10/core/Allocator.h>
#include <c10/cuda/CUDAMacros.h>
#include <c10/util/Registry.h>

#include <array>
#include <mutex>

namespace c10 {

class C10_CUDA_API CUDAOutOfMemoryError : public c10::Error {
  using Error::Error;
};

// Caching allocator will execute every registered callback if it unable to find
// block inside of already allocated area.
class C10_CUDA_API FreeMemoryCallback {
 public:
  virtual ~FreeMemoryCallback() {};
  virtual bool Execute() = 0;
};

C10_DECLARE_REGISTRY(FreeCudaMemoryCallbacksRegistry, FreeMemoryCallback);
#define REGISTER_FREE_MEMORY_CALLBACK(name, ...) \
  C10_REGISTER_CLASS(FreeCudaMemoryCallbacksRegistry, name, __VA_ARGS__);

namespace cuda {
namespace CUDACachingAllocator {

struct Stat {
  int64_t current = 0;
  int64_t peak = 0;
  int64_t allocated = 0;
  int64_t freed = 0;
};

enum struct StatType : uint64_t {
  AGGREGATE = 0,
  SMALL_POOL = 1,
  LARGE_POOL = 2,
  NUM_TYPES = 3  // remember to update this whenever a new stat type is added
};

typedef std::array<Stat, static_cast<size_t>(StatType::NUM_TYPES)> StatArray;

// Struct containing memory allocator summary statistics for a device.
struct DeviceStats {
  // COUNT: allocations requested by client code
  StatArray allocation;
  // COUNT: number of allocated segments from cudaMalloc().
  StatArray segment;
  // COUNT: number of active memory blocks (allocated or used by stream)
  StatArray active;
  // COUNT: number of inactive, split memory blocks (unallocated but can't be released via cudaFree)
  StatArray inactive_split;

  // SUM: bytes requested by client code
  StatArray allocated_bytes;
  // SUM: bytes reserved by this memory allocator (both free and used)
  StatArray reserved_bytes;
  // SUM: bytes within active memory blocks
  StatArray active_bytes;
  // SUM: bytes within inactive, split memory blocks
  StatArray inactive_split_bytes;

  // COUNT: total number of failed calls to CUDA malloc necessitating cache flushes.
  int64_t num_alloc_retries = 0;

  // COUNT: total number of OOMs (i.e. failed calls to CUDA after cache flush)
  int64_t num_ooms = 0;
};

// Struct containing info of an allocation block (i.e. a fractional part of a cudaMalloc)..
struct BlockInfo {
  int64_t size = 0;
  bool allocated = false;
  bool active = false;
};

// Struct containing info of a memory segment (i.e. one contiguous cudaMalloc).
struct SegmentInfo {
  int64_t device = 0;
  int64_t address = 0;
  int64_t total_size = 0;
  int64_t allocated_size = 0;
  int64_t active_size = 0;
  bool is_large = false;
  std::vector<BlockInfo> blocks;
};

C10_CUDA_API void* raw_alloc(size_t nbytes);
C10_CUDA_API void* raw_alloc_with_stream(size_t nbytes, cudaStream_t stream);
C10_CUDA_API void raw_delete(void* ptr);

C10_CUDA_API Allocator* get();
C10_CUDA_API void init(int device_count);
C10_CUDA_API void setMemoryFraction(double fraction, int device);
C10_CUDA_API void emptyCache();
C10_CUDA_API void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock);
C10_CUDA_API void* getBaseAllocation(void *ptr, size_t *size);
C10_CUDA_API void recordStream(const DataPtr&, CUDAStream stream);
C10_CUDA_API DeviceStats getDeviceStats(int device);
C10_CUDA_API void resetAccumulatedStats(int device);
C10_CUDA_API void resetPeakStats(int device);
C10_CUDA_API std::vector<SegmentInfo> snapshot();

C10_CUDA_API std::mutex* getFreeMutex();

C10_CUDA_API std::shared_ptr<void> getIpcDevPtr(std::string handle);
} // namespace CUDACachingAllocator

}} // namespace c10::cuda

#endif

这些函数都很不错,初始化、分配数据、free数据等均包含在内。

我们移步CudaCachingAllocator.cpp函数中。

令人诧异的是里面有三个分配器:DeviceCachingAllocatorTHCCachingAllocator以及CudaCachingAllocator 。然而二和三基本是同一类,因为CudaCachingAllocator就只有简短的几行并且调用的malloc函数来源于THCCachingAllocator

// NB: I decided not to fold this into THCCachingAllocator, because the latter
// has a lot more methods and it wasn't altogether clear that they should
// actually be publicly exposed
struct CudaCachingAllocator : public Allocator {
  DataPtr allocate(size_t size) const override {
    int device;
    C10_CUDA_CHECK(cudaGetDevice(&device));
    void* r = nullptr;
    if (forceUncachedAllocator()) {
      C10_CUDA_CHECK(cudaMalloc(&r, size));
      return {r, r, &uncached_delete, Device(DeviceType::CUDA, device)};
    }
    if (size != 0) {
      caching_allocator.malloc(&r, device, size, cuda::getCurrentCUDAStream(device));
    }
    return {r, r, &raw_delete, Device(DeviceType::CUDA, device)};
  }
  DeleterFnPtr raw_deleter() const override {
    return &raw_delete;
  }
};

那我们就重点分析THCCachingAllocator

其中包含如下函数:

首先是init函数:

  void init(int device_count) {
    int size = device_allocator.size();
    if (size < device_count) {
      device_allocator.resize(device_count);
      for (int i = size; i < device_count; i++) {
        device_allocator[i] = std::unique_ptr<DeviceCachingAllocator>(new DeviceCachingAllocator());
      }
    }
  }

根据设备数量初始化device_allocator分配器,要保证device_allocator数量能装下设备的数量(GPU数量)。

Malloc()函数

之后是malloc函数

  /** allocates a block which is safe to use from the provided stream */
  void malloc(void** devPtr, int device, size_t size, cudaStream_t stream) {
    TORCH_INTERNAL_ASSERT(
        0 <= device && device < device_allocator.size(),
        "Allocator not initialized for device ",
        device,
        ": did you call init?");
    Block* block = device_allocator[device]->malloc(device, size, stream);
    add_allocated_block(block);
    *devPtr = (void*)block->ptr;
  }

调用device_allocator的分配函数,并且把新建的block加入到add_allocated_block中。

调用的函数有100多行,下面慢慢分析(这个函数在DeviceCachingAllocator中):

首先我们简化代码分析:

  // All public methods (except the above) acquire the allocator mutex.
  // Thus, do not call a public method from another public method.

  Block* malloc(int device, size_t size, cudaStream_t stream)
  {
    std::unique_lock<std::recursive_mutex> lock(mutex);

    // process outstanding cudaEvents
    process_events();
    // 分配512 byte倍数的数据
    size = round_size(size);
    // 寻找合适的内存池进行分配
    auto& pool = get_pool(size);
    // 根据分配segment分配分配空间
    const size_t alloc_size = get_allocation_size(size);
    // 把need的数据放入params中,尤其是size、alloc_size
    AllocParams params(device, size, stream, &pool, alloc_size, stats);
    // 设置标志,其中stat_types包括三个标志,分别针对AGGREGATE、SMALL_POOL以及LARGE_POOL,分别有bitset进行赋值(true of false)
    params.stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
    params.stat_types[static_cast<size_t>(get_stat_type_for_pool(pool))] = true;

    bool block_found =
      // Search pool
      // 从当前pool中搜索符合要求的一块空闲数据
      get_free_block(params)
      // Trigger callbacks and retry search
      // trigger_free_memory_callbacks???不懂是做什么的
      || (trigger_free_memory_callbacks(params) && get_free_block(params))
      // Attempt allocate
      // 分配一个新数据块
      || alloc_block(params, false)
      // Free all non-split cached blocks and retry alloc.
      || (free_cached_blocks() && alloc_block(params, true));

    TORCH_INTERNAL_ASSERT((!block_found && params.err != cudaSuccess) || params.block);
    if (!block_found) {
        .......暂时省略
    }

    Block* block = params.block;
    Block* remaining = nullptr;
    TORCH_INTERNAL_ASSERT(block);

    const bool already_split = block->is_split();
    // block分裂
    if (should_split(block, size)) {
      ......
    } else if (already_split) {
      // An already-split block is becoming active
      ......
    }

    block->allocated = true;
    // active_blocks中存储的是正在使用的block,insert表示将新建立的block插入到这个集合中
    active_blocks.insert(block);

    c10::reportMemoryUsageToProfiler(
        block, block->size, c10::Device(c10::DeviceType::CUDA, device));
    // 以此保存内存分配次数、内存分配byte大小、正在使用的数据个数、正在使用的数据大小
    update_stat_array(stats.allocation, 1, params.stat_types);
    update_stat_array(stats.allocated_bytes, block->size, params.stat_types);
    update_stat_array(stats.active, 1, params.stat_types);
    update_stat_array(stats.active_bytes, block->size, params.stat_types);

    return block;
  }

该代码中最为核心的部分我认为就是block_found的部分,其中包括了四个小部分,分别为:1 get_free_block(params)从对应大小的Pool中搜索出>=所需size的数据,并分配。其中分配好后将调用pool.erase(it);将pool中的it数据删掉。(pool在auto& pool = get_pool(size);中定义了small_blockslarge_blocks两个内存池)

  bool get_free_block(AllocParams& p) {
    BlockPool& pool = *p.pool;
    // 返回 >=size的集合
    auto it = pool.lower_bound(&p.search_key);
    // false 代表没找到合适的大小-搜索大于等于目标的第一个块
    if (it == pool.end() || (*it)->stream != p.stream())
      return false;
    // 如果device和stream和目标相同的话就分配该块内存
    p.block = *it;
    // 将it从指定的pool中删除,因为已经分配给p
    pool.erase(it);
    return true;
  }

alloc_block(params, false)表示重新分配一个新的空间:

  bool alloc_block(AllocParams& p, bool isRetry) {
    size_t size = p.alloc_size;
    void* ptr;

    if (isRetry) {
      stats.num_alloc_retries += 1;
    }
    // 所需分配的大小>允许分配的大小
    if (set_fraction && total_allocated_memory + size > allowed_memory_maximum) {
      p.err = cudaErrorMemoryAllocation;
    } else {
      // 调用cudaMalloc分配数据
      p.err = cudaMalloc(&ptr, size);
    }

    if (p.err != cudaSuccess) {
      if (!isRetry || p.err == cudaErrorMemoryAllocation)
        cudaGetLastError();  // clear CUDA error
      return false;
    }
    // 更新分配过的内存
    total_allocated_memory += size;
    // 把新分配的内存组织成一个block
    p.block = new Block(p.device(), p.stream(), size, p.pool, (char*)ptr);
    // 用一个全局记录,stats.segment代表碎片的数量,stats.reserved_bytes代表pool中已经分配数据的大小。碎片量+1,分配数据量+size
    update_stat_array(stats.segment, 1, p.stat_types);
    update_stat_array(stats.reserved_bytes, size, p.stat_types);

    return (p.block != nullptr);
  }

其中最重要的是调用cudaMalloc函数,其中同样记录了许多标记位。

其中的trigger_free_memory_callbacks以及free_cached_blocks目前还没有太懂其作用。

如果block分配成功,且不需要分裂,则会在active_blocks(这个active_blocks代表当前正在使用的内存block集合)中插入(insert)这个block内存块。之后返回block

如果无法分配合理的空间,那么系统会调用free_cached_blocks()函数先将cache释放掉,然后再重新分配。

函数如下:

(free_cached_blocks() && alloc_block(params, true));

  bool free_cached_blocks()
  {
    // First ensure that all blocks that can't currently be allocated due to
    // outstanding events are returned to the pool.
    // 首先,确保所有当前由于未完成事件而无法分配的块都返回到池中。
    synchronize_and_free_events();
    // Free all non-split cached blocks
    free_blocks(large_blocks);
    free_blocks(small_blocks);
    return true;
  }

系统会首先等待当前相关事件均释放。

  void synchronize_and_free_events() {
    // Synchronize on outstanding events and then free associated blocks.

    for (auto& e : cuda_events) {
      cudaEvent_t event = e.first;
      Block* block = e.second;
      // 等待事件返回
      C10_CUDA_CHECK(cudaEventSynchronize(event));
      free_event_internal(event);
      // number of outstanding CUDA events
      // 返回了就减一
      block->event_count--;
      // 若是最后一个,则直接释放block
      if (block->event_count == 0) {
        free_block(block);
      }
    }

然后直接释放两个存储池:

  void free_blocks(BlockPool& blocks)
  {
    // Frees all non-split blocks
    auto it = blocks.begin();
    while (it != blocks.end()) {
      Block* block = *it;
      if (!block->prev && !block->next) {
        C10_CUDA_CHECK(cudaFree((void*)block->ptr));
        total_allocated_memory -= block->size;

        StatTypes stat_types;
        stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
        stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
        update_stat_array(stats.segment, -1, stat_types);
        update_stat_array(stats.reserved_bytes, -block->size, stat_types);
        auto cur = it;
        ++it;
        blocks.erase(cur);
        delete block;
      } else {
        ++it;
      }
    }
  }

此时我们看一下,如果即无法重复使用指针,也没有额外的资源分配空间,那应该怎么办呢?即上面代码隐藏的部分:

    if (!block_found) {
      if (params.err == cudaErrorMemoryAllocation) {
        size_t device_free;
        size_t device_total;
        C10_CUDA_CHECK(cudaMemGetInfo(&device_free, &device_total));
        std::string allowed_info;

        if (set_fraction) {
          allowed_info = format_size(allowed_memory_maximum) + " allowed; ";
        }

        stats.num_ooms += 1;

        // "total capacity": total global memory on GPU
        // "allowed": memory is allowed to use, which set by fraction.
        // "already allocated": memory allocated by the program using the
        //                      caching allocator
        // "free": free memory as reported by the CUDA API
        // "cached": memory held by the allocator but not used by the program
        //
        // The "allocated" amount  does not include memory allocated outside
        // of the caching allocator, such as memory allocated by other programs
        // or memory held by the driver.
        //
        // The sum of "allocated" + "free" + "cached" may be less than the
        // total capacity due to memory held by the driver and usage by other
        // programs.
        //
        // Note that at this point free_cached_blocks has already returned all
        // possible "cached" memory to the driver. The only remaining "cached"
        // memory is split from a larger block that is partially in-use.
        TORCH_CHECK_WITH(CUDAOutOfMemoryError, false,
          "CUDA out of memory. Tried to allocate ", format_size(alloc_size),
          " (GPU ", device, "; ",
          format_size(device_total), " total capacity; ",
          format_size(stats.allocated_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
          " already allocated; ",
          format_size(device_free), " free; ",
          allowed_info,
          format_size(stats.reserved_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
          " reserved in total by PyTorch)");
      } else {
        C10_CUDA_CHECK(params.err);
      }
    }

该部分处理分配未成功的部分。如果走到了这里,那程序就意味着没救了,剩下的就只有崩溃,所以我们在这里解读一下该代码返回的具体含义:

别看这一堆代码有点混乱,我放个图也许经常写PyTorch代码的人就感同身受了。

而这句话的来源,就是这个地方。

这里我们需要搞清楚各个部分分别代表什么:

 * \param free  - Returned free memory in bytes
 * \param total - Returned total memory in bytes

这个值对应malloc中的update_stat_array(stats.allocated_bytes, block->size, params.stat_types);

  // SUM: bytes reserved by this memory allocator (both free and used)
  StatArray reserved_bytes;

分裂函数首先需要判断是否需要分裂:

    // block分裂,针对get_free_block以及alloc_block情况(复用cache的指针以及重新分配)
    if (should_split(block, size)){......}
  bool should_split(const Block* block, size_t size) {
    size_t remaining = block->size - size;
    // 使用small池子存储 < 1MB
    if (block->pool == &small_blocks) {
      // 剩下的数据 > 512 byte
      return remaining >= kMinBlockSize;
      // 大池子 >1MB
    } else if (block->pool == &large_blocks) {
      // 剩余的数据 > 1 Mib
      return remaining > kSmallSize;
    } else {
      AT_ERROR("should_split: invalid pool");
    }
  }

该函数判断是否需要分裂,但是我分析后感觉这个函数首先是针对于get_free_block(),因为该函数是复用cache中的it指针;其次针对alloc_block()函数,该函数分配的数据为p.alloc_size,而代码中的size_t remaining = block->size - size;换句话说就是p.alloc_size - round_size(size);,而这两个的关系为:alloc_size = get_allocation_size(size);。这里插播一下get_allocation_size()

  static size_t get_allocation_size(size_t size) {
    // kSmallSize = 1MB
    if (size <= kSmallSize) {
      // 2MB的buffer
      return kSmallBuffer;
      // <10MB的数据
    } else if (size < kMinLargeAlloc) {
      // 分配20MB的buffer
      return kLargeBuffer;
      // >10MB的数据
    } else {
      // 2 MB的倍数
      return kRoundLarge * ((size + kRoundLarge - 1) / kRoundLarge);
    }
  }

确实有可能存在分配冗余的情况。

    // block分裂,这里我觉得针对get_free_block这种情况(复用cache的指针)
    if (should_split(block, size)) {
      remaining = block;
      // 新建一个block,其大小为size,而不是alloc_size(因为alloc_size实际大小过大,需要分裂)
      // 这里有个问题?为什么不直接分配一个合适大小的block,而要到分裂的时候再进行?哲学思想还没get
      block = new Block(device, stream, size, &pool, block->ptr);
      // 在原来的block链中间插入新的block,而把原来的block转化为remaining,添加到新block的后面
      block->prev = remaining->prev;
      if (block->prev) {
        block->prev->next = block;
      }
      block->next = remaining;

      remaining->prev = block;
      // 将remaining块缩小
      remaining->ptr = static_cast<char*>(remaining->ptr) + size;
      remaining->size -= size;
      pool.insert(remaining);

      if (already_split) {
        // An already-split inactive block is being shrunk by size bytes.
        update_stat_array(stats.inactive_split_bytes, -block->size, params.stat_types);
      } else {
        // A new split inactive block is being created from a previously unsplit block,
        // size remaining->size bytes.
        update_stat_array(stats.inactive_split_bytes, remaining->size, params.stat_types);
        update_stat_array(stats.inactive_split, 1, params.stat_types);
      }
    } else if (already_split) {
      // An already-split block is becoming active
      update_stat_array(stats.inactive_split_bytes, -block->size, params.stat_types);
      update_stat_array(stats.inactive_split, -1, params.stat_types);
    }

Free()函数

下面看free()函数:

同上面分配函数一致,我们需要从THCCachingAllocator进入。

 void free(void* ptr) {
   if (!ptr) {
     return;
   }
   Block* block = get_allocated_block(ptr, true /* remove */);
   if (!block) {
     AT_ERROR("invalid device pointer: ", ptr);
   }
   device_allocator[block->device]->free(block);
 }

这里通过调用device_allocator[block->device]->free(block);进入

  void free(Block* block)
  {
    std::lock_guard<std::recursive_mutex> lock(mutex);

    block->allocated = false;

    c10::reportMemoryUsageToProfiler(
        block, -block->size, c10::Device(c10::DeviceType::CUDA, block->device));
    // 更新全局的记录
    StatTypes stat_types;
    stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
    stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
    update_stat_array(stats.allocation, -1, {stat_types});
    update_stat_array(stats.allocated_bytes, -block->size, {stat_types});
    // 判断stream是不是空的
    if (!block->stream_uses.empty()) {
      //  stream_uses不是空,则进入
      insert_events(block);
    } else {
      // 是空的进入
      free_block(block);
    }
  }

我们首先看free_block(block);

  /** moves a block into a pool of cached free blocks */
  void free_block(Block* block)
  {
    TORCH_INTERNAL_ASSERT(!block->allocated && block->event_count == 0);

    size_t original_block_size = block->size;

    auto& pool = *block->pool;
    int64_t net_change_inactive_split_blocks = 0;
    int64_t net_change_inactive_split_size = 0;

    const std::array<Block*, 2> merge_candidates = {block->prev, block->next};
    for (Block* merge_candidate : merge_candidates) {
      // 尝试合并空余的block,尝试将block和前面以及后面的块进行合并(注意这里前后都要合并)
      const int64_t subsumed_size = try_merge_blocks(block, merge_candidate, pool);
      if (subsumed_size > 0) {
        net_change_inactive_split_blocks -= 1;
        net_change_inactive_split_size -= subsumed_size;
      }
    }
    // active_blocks中保存正在被使用的指针
    active_blocks.erase(block);
    // pool中保存可用的指针
    pool.insert(block);

    if (block->is_split()) {
      net_change_inactive_split_blocks += 1;
      net_change_inactive_split_size += block->size;
    }

    StatTypes stat_types;
    stat_types[static_cast<size_t>(StatType::AGGREGATE)] = true;
    stat_types[static_cast<size_t>(get_stat_type_for_pool(*(block->pool)))] = true;
    update_stat_array(stats.inactive_split, net_change_inactive_split_blocks, stat_types);
    update_stat_array(stats.inactive_split_bytes, net_change_inactive_split_size, stat_types);
    update_stat_array(stats.active, -1, stat_types);
    update_stat_array(stats.active_bytes, -original_block_size, stat_types);
  }

这里我们要注意的地方是,这里并没有调用free函数,仅仅是使用active_blocks.erase(block); pool.insert(block);做一个假的释放,并且把可用的指针放到pool中,用于malloc的时候的分配。

这里值得注意的地方是insert_events(block);。该函数含义有点晦涩,我分析认为:

标准库提供了一个recordStream()函数来帮助在多个流上使用分配时插入正确的同步。这将确保在每个记录的流完成工作之前块不会被重用。

也就是说如果多个stream都用了这个pool,那么这个free是不能真正的执行的,需要等到所有stream的工作都完成了才能释放。

上一篇下一篇

猜你喜欢

热点阅读