Caffe2核心代码解析系列之二:Context

2018-08-28  本文已影响0人  manofmountain

介绍

在caffe2的core code里面如果查找与context相关的内容则无外乎以下几个文件。接下来的一个章节里,我们将会对它们的主要内容逐个进行分析。

$ ls context*
context_base.cc  context.cc      context_gpu.h        context.h
context_base.h   context_gpu.cu  context_gpu_test.cc  context_test.cc

Context本身是对物理设备的一种抽象,可以将之视为类似于Tensorflow中Device或Caffe中Engine的东东。我们使用Tensor来表示数据存储单元或使用Operator来表示数据单元之间的基本运算都需要落实在具体物理计算设备上来执行。一般说来当今最常用的两种用于AI上的计算设备还是CPU和GPU。所以默认地caffe2也同其它大多数Framework一样提供了这两种计算设备的抽象,分别为CPUContext/CPUStaticContext与CUDAContext/CUDAStaticContext。而这两个设备的抽象接口则被实现在 两个虚基类BaseStaticContext/BaseContext里面。这是个典型的面向对象的设计方式。

BaseContext和BaseStaticContext

BaseStaticContext

BaseStaticContext里面主要包含一些基本的内存分配操作(如New)与Device type查询或设置操作,此外还可以由它来生成完成大部分更高层次功能的BaseContext子类对象并返回。

以下为它的基本接口。

class CAFFE2_API BaseStaticContext {
 public:
  virtual ~BaseStaticContext() noexcept {}

  virtual std::pair<void*, MemoryDeleter> New(size_t nbytes) const = 0;

  virtual std::unique_ptr<BaseContext> CreateContext() = 0;

  virtual std::unique_ptr<BaseContext> CreateContext(const DeviceOption&) = 0;

  virtual DeviceType GetDeviceType() = 0;

  /*
   * @brief: Sets the DeviceOption for argument `device` based on the
   * current context and the a data pointer
   */
  virtual void ExtractDeviceOption(DeviceOption* device, const void* /*data*/) {
    device->set_device_type(GetDeviceType());
  }
};

BaseContext

BaseContext里面包含了所有一个设备为完成caffe2所支持的operator操作需要提供的基本功能。它是一个大的context功能虚拟接口的集合。
它里面的主要函数功能见名即知其意,简要介绍如下:

SwitchToDevice: 这个主要是针对GPU设备而设计的,习惯了CUDA编程的人都明白,我们在开始一个GPU CUDA操作前,总是需要先将当前的环境切换到某个GPU上,并拿到相应的cudaStream才能接着向下进行;

WaitEvent/Record: 与async执行op操作有关,class Event在async执行中发挥着较大的作用,这两个函数一般提供用来记录此Context下正被等待或已经发生了的events;

FinishDeviceComputation: 主要用于GPU等可async执行的Device context里面,它相当于一个在多个CUDA stream上执行流的sync过程中,类似于多线程编程里面的一个Barrier点;

Copy_xyx: 这些Copy函数主要是一些utilities函数,用来完成Device to Device/Device to CPU/CPU to Device等各种类型之间的数据拷贝;

此外我们也还要有一个static的BaseStaticContext数组用来指向各种类型的device context的StaticContext以方便地在这些设备上进行分配内存或操作设备类型等;

它的friend StaticContextFunctionRegisterer则主要用来静态地(即在编译时)将已知的StaticeDevice对象注册上,进而方便接下来使用。

class CAFFE2_API BaseContext {
 public:
  virtual ~BaseContext() noexcept {}

  virtual BaseStaticContext* GetStaticContext() const = 0;

  /* Sorry for the naming, will get rid of this in future diff */
  virtual DeviceType GetDevicetype() const = 0;

  virtual void SwitchToDevice(int /*stream_id*/) = 0;

  inline void SwitchToDevice() {
    SwitchToDevice(0);
  }

  virtual void WaitEvent(const Event& ev) = 0;

  virtual void Record(Event* ev, const char* err_msg = nullptr) const = 0;

  virtual void FinishDeviceComputation() = 0;

  // This used to be arbitrary cross-device copy, but it turns out everyone
  // did direct CPU-X copy, so we just make three functions for it (to avoid
  // double dispatch).  This will get obsoleted by C10. where copies
  // will be proper operators (and get to rely on multiple dispatch there.)
  virtual void CopyBytesSameDevice(size_t nbytes, const void* src, void* dst) = 0;

  virtual void CopyBytesFromCPU(size_t nbytes, const void* src, void* dst) = 0;

  virtual void CopyBytesToCPU(size_t nbytes, const void* src, void* dst) = 0;

  virtual void CopyBytesToDevice(size_t nbytes, const void* src, void* dst, DeviceType type);

  inline void CopyItemsFromCPU(const TypeMeta& meta, size_t n, const void* src, void* dst);

  inline void CopyItemsToCPU(const TypeMeta& meta, size_t n, const void* src, void* dst);

  static BaseStaticContext* static_context_[COMPILE_TIME_MAX_DEVICE_TYPES];

  template <int d>
  friend struct StaticContextFunctionRegisterer;
};

CPUContext和CPUStaticContext

CPUStaticContext

以下为CPUStaticContext的具体实现。它的New主要通过使用借助CPUAllocator来完成,并可通过打开FLAGS_caffe2_report_cpu_memory_usage这个开关来记录、显示CPU设备之上内存分配的情况。

class CAFFE2_API CPUStaticContext : public BaseStaticContext {
 public:
  std::pair<void*, MemoryDeleter> New(size_t nbytes) const override {
    auto data_and_deleter = GetCPUAllocator()->New(nbytes);
    if (FLAGS_caffe2_report_cpu_memory_usage) {
      reporter_.New(data_and_deleter.first, nbytes);
      data_and_deleter.second = ReportAndDelete;
    }
    return data_and_deleter;
  }

  std::unique_ptr<BaseContext> CreateContext() override {
    return caffe2::make_unique<CPUContext>();
  }

  std::unique_ptr<BaseContext> CreateContext(
      const DeviceOption& option) override {
    return caffe2::make_unique<CPUContext>(option);
  }

  DeviceType GetDeviceType() override {
    return CPU;
  }

 protected:
  static MemoryAllocationReporter reporter_;
 private:
  static void ReportAndDelete(void* ptr) {
    reporter_.Delete(ptr);
    GetCPUAllocator()->GetDeleter()(ptr);
  }
};

CPUContext

以下为CPUContext里面的主要功能函数实现。

可以看出BaseContext里面提供的许多功能函数其实在CPUContext里是并不需要的像SwitchToDevice等;另外CPUContext默认并不支持async操作,因此那些使用它的Operators显然也无法以async的方式执行。它的设备间Copy的方式比较简单,直接使用memcpy就好了。另外每个类型Context还需要具有一个随机数发生器,用于支持operator里面可能需要的random操作。

class CAFFE2_API CPUContext final : public BaseContext {
 public:
  typedef std::mt19937 rand_gen_type;
  CPUContext() : random_seed_(RandomNumberSeed()) {}
  explicit CPUContext(const DeviceOption& option)
      : random_seed_(
            option.has_random_seed() ? option.random_seed()
                                     : RandomNumberSeed()) {
    CAFFE_ENFORCE_EQ(option.device_type(), CPU);
  }

  ~CPUContext() noexcept override {}

  BaseStaticContext* GetStaticContext() const override {
    return GetCPUStaticContext();
  }
  inline void SwitchToDevice(int /*stream_id*/) override {}

  using BaseContext::SwitchToDevice;

  inline void WaitEvent(const Event& ev) override {
    ev.Wait(CPU, this);
  }

  inline void Record(Event* ev, const char* err_msg = nullptr) const override {
    CAFFE_ENFORCE(ev, "Event must not be null.");
    ev->Record(CPU, this, err_msg);
  }
 inline static std::pair<void*, MemoryDeleter> New(size_t nbytes) {
    return StaticContext()->New(nbytes);
  }

  void CopyBytesSameDevice(size_t nbytes, const void* src, void* dst) override {
    if (nbytes == 0) {
      return;
    }
    CAFFE_ENFORCE(src);
    CAFFE_ENFORCE(dst);
    memcpy(dst, src, nbytes);
  }
.............
.............
  // By default CPU operators don't have async device parts
  static bool HasAsyncPartDefault() {
    return false;
  }

  static bool SupportsAsyncScheduling() {
    return false;
  }

  // CPU streams are not implemented and are silently ignored by CPU ops,
  // return true to signal executor to schedule a CPU op
  static bool IsStreamFree(
      const DeviceOption& /* option */,
      int /* stream_id */) {
    return true;
  }

  DeviceType GetDevicetype() const override {
    return CPU;
  }
 protected:
  // TODO(jiayq): instead of hard-coding a generator, make it more flexible.
  int random_seed_{1701};
  std::unique_ptr<rand_gen_type> random_generator_;
};

CUDAContext和CUDAStaticContext

caffe2中CUDAContext的设计要复杂的多。毕竟它是当下承担operator执行的主力。没办法,眼下真正是大家都已偏见地视AI计算==GPU了。。(可实际上我们真的有其它许多选择像优化过了的CPU底层库像MKLDNN,抽象封装过了的FPGA库或者甚至是Google提供的TPU等AI芯片)。。

首先CUDA memory的type有许多。我们大致有以下三种选择。

enum class CudaMemoryPoolType {
  NONE = 0,
  CUB = 1,
  THC = 2,
};

然后因为GPU CUDA编程有着强大的并行编程能力。一般我们需要指定某一GPU下的某一个cudaStream用于执行具体的计算,为此需要协调好Framework这边多线程在执行GPU上互相排斥执行的多个cudaStream时的线性操作。为此caffe2里面在CUDAContext里面提供了一些thread local的变量用于表示这么一些全局的资源。

class CAFFE2_API ThreadLocalCUDAObjects {
  friend class CUDAContext;

 private:
  ThreadLocalCUDAObjects() {
    for (int i = 0; i < CAFFE2_COMPILE_TIME_MAX_GPUS; ++i) {
      cuda_streams_[i] = vector<cudaStream_t>();
      cublas_handles_[i] = vector<cublasHandle_t>();
#ifdef CAFFE2_USE_CUDNN
      cudnn_handles_[i] = vector<cudnnHandle_t>();
#endif // CAFFE2_USE_CUDNN
    }
  }
  cudaStream_t GetStream(int gpu, int stream_id);
  cublasHandle_t GetHandle(int gpu, int stream_id);
#ifdef CAFFE2_USE_CUDNN
  cudnnHandle_t GetCudnnHandle(int gpu, int stream_id);
#endif
.............
...............
  vector<cudaStream_t> cuda_streams_[CAFFE2_COMPILE_TIME_MAX_GPUS];
  vector<cublasHandle_t> cublas_handles_[CAFFE2_COMPILE_TIME_MAX_GPUS];
#ifdef CAFFE2_USE_CUDNN
  vector<cudnnHandle_t> cudnn_handles_[CAFFE2_COMPILE_TIME_MAX_GPUS];
#endif // CAFFE2_USE_CUDNN
};

最后caffe2通过下面这个static函数来lazily决定是否初始化众多GPU环境需要的东东。。主要是确定GPUs之间的peer to peer连接是没有问题的。

///////////////////////////////////////////////////////////////////////////////
// A wrapper to allow us to lazily initialize all cuda environments that Caffe
// uses. This gets done the first time a caffe2::CUDAContext::New() gets called
// which is probably the decisive indication that this caffe2 run is going to
// use GPUs. We avoid cuda initialization with core/init.h functionalities so
// that we have minimal resource impact in case we will need to run multiple
// caffe2 instances on a GPU machine.
///////////////////////////////////////////////////////////////////////////////

static void Caffe2InitializeCuda() {
  // If the current run does not have any cuda devices, do nothing.
  if (!HasCudaGPU()) {
    VLOG(1) << "No cuda gpu present. Skipping.";
    return;
  }
  // Check if the number of GPUs matches the expected compile-time max number
  // of GPUs.
  CAFFE_ENFORCE_LE(
      NumCudaDevices(),
      CAFFE2_COMPILE_TIME_MAX_GPUS,
      "Number of CUDA devices on the machine is larger than the compiled "
      "max number of gpus expected (",
      CAFFE2_COMPILE_TIME_MAX_GPUS,
      "). Increase that and recompile the caffe binary.");

  for (int i = 0; i < NumCudaDevices(); ++i) {
    DeviceGuard g(i);
    // Enable peer access.
    const int peer_group = i / CAFFE2_CUDA_MAX_PEER_SIZE;
    const int peer_start = peer_group * CAFFE2_CUDA_MAX_PEER_SIZE;
    const int peer_end = std::min(
        NumCudaDevices(), (peer_group + 1) * CAFFE2_CUDA_MAX_PEER_SIZE);
    VLOG(1) << "Enabling peer access within group #" << peer_group
            << ", from gpuid " << peer_start << " to " << peer_end - 1
            << ", for gpuid " << i << ".";

    for (int j = peer_start; j < peer_end; ++j) {
      if (i == j) continue;
      int can_access;
      CUDA_ENFORCE(cudaDeviceCanAccessPeer(&can_access, i, j));
      if (can_access) {
        VLOG(1) << "Enabling peer access from " << i << " to " << j;
        // Note: just for future reference, the 0 here is not a gpu id, it is
        // a reserved flag for cudaDeviceEnablePeerAccess that should always be
        // zero currently.
        CUDA_ENFORCE(cudaDeviceEnablePeerAccess(j, 0));
      }
    }
  }

#ifdef CAFFE2_USE_CUDNN
  // Check the versions of cuDNN that were compiled and linked with are compatible
  CheckCuDNNVersions();
#endif // CAFFE2_USE_CUDNN
}

下面这个stuct Caffe2CudaInitializerHelper则是具体负责诸多的初始化函数发起。而且只初始一次。

// Caffe2CudaInitializerHelper is a minimal struct whose sole purpose is to
// detect the first hint that this Caffe2 run is going to use GPU: either
// CUDAContext is initialized or CUDAContext::New is called. It then runs
// all the related cuda initialization functions.
namespace {
struct Caffe2CudaInitializerHelper {
  Caffe2CudaInitializerHelper() {
    // We cannot use bool because nvcc changes bool to __nv_bool which does
    // not have a std::atomic instantiation.
    static std::atomic<char> first_call(1);
    if (first_call.fetch_and((char)0)) {
      Caffe2InitializeCuda();
      Caffe2SetCUDAMemoryPool();
      Caffe2UsePinnedCPUAllocator();
    }
  }
};
} // namespace

CUDAStaticContext

下面我们主要看下CUDA下内存分配的方式,可以看出它会根据当下所用的memory pool类型提供不同类型的GPU memory分配。此外还会对这些分配的GPU内存分别以各种数据结构变量像g_size_map/g_cuda_device_affiliation来记录以追踪分配的内存的大小及所属的GPU id。

当然在它的Delete函数里自然也会根据对应的memory type分别执行不同类型的内存释放操作,在此不表。

class CAFFE2_API CUDAStaticContext final : public BaseStaticContext {
 public:
  std::pair<void*, MemoryDeleter> CUDAStaticContext::New(size_t nbytes) const {
  // Lock the mutex
  std::lock_guard<std::mutex> lock(CUDAContext::mutex());
  // A one-time caffe2 cuda initializer.
  static Caffe2CudaInitializerHelper g_cuda_initializer_;
  void* ptr = nullptr;

  if (FLAGS_caffe2_gpu_memory_tracking) {
    TrackMemoryAlloc(nbytes);
  }
  switch (g_cuda_memory_pool_type) {
  case CudaMemoryPoolType::NONE:
    CUDA_ENFORCE(cudaMalloc(&ptr, nbytes));
    if (FLAGS_caffe2_gpu_memory_tracking) {
      g_size_map[ptr] = nbytes;
      g_cuda_device_affiliation[ptr] = CaffeCudaGetDevice();
    }
    return {ptr, Delete};
  case CudaMemoryPoolType::CUB:
    CUDA_ENFORCE(g_cub_allocator->DeviceAllocate(&ptr, nbytes));
    g_cuda_device_affiliation[ptr] = CaffeCudaGetDevice();
    VLOG(2) << "CUB allocating pointer " << ptr << " on device "
            << CaffeCudaGetDevice();
    if (FLAGS_caffe2_gpu_memory_tracking) {
      g_size_map[ptr] = nbytes;
    }
    return {ptr, Delete};
  case CudaMemoryPoolType::THC:
    CUDA_ENFORCE(g_thc_allocator->Alloc(&ptr, nbytes, 0 /* stream */));
    if (FLAGS_caffe2_gpu_memory_tracking) {
      g_size_map[ptr] = nbytes;
      g_cuda_device_affiliation[ptr] = CaffeCudaGetDevice();
    }
    return {ptr, Delete};
  }
  return {nullptr, Delete};
 }
};

bPI memory时应当如何呢。

/**
 * An allocator that does the CPU memory allocation with pinned memory.
 *
 * This is needed because if we want to do any asynchronous cuda memcpy,
 * the underlying CPU memory also needs to be allocated into pinned memory
 * space. As a result, whenever Caffe2 is built with GPU and there is
 * GPU present during runtime, at global initialization time we will set
 * the CPU memory allocator to allocate pinned memory.
 */
struct CAFFE2_API PinnedCPUAllocator final : CPUAllocator {
  PinnedCPUAllocator() {}
  ~PinnedCPUAllocator() override {}
  std::pair<void*, MemoryDeleter> New(size_t nbytes) override {
    void* data;
    std::lock_guard<std::mutex> lock(CUDAContext::mutex());
    if (IsNUMAEnabled()) {
      auto ptr_and_deleter = baseAllocator_.New(nbytes);
      data = ptr_and_deleter.first;
      CAFFE_ENFORCE(data);
      CUDA_ENFORCE(cudaHostRegister(data, nbytes, cudaHostRegisterDefault));
    } else {
      CUDA_ENFORCE(cudaMallocHost(&data, nbytes));
    }
    memset(data, 0, nbytes);
    return {data, Delete};
  }

  MemoryDeleter GetDeleter() override {
    return Delete;
  }
.....
.....
  DefaultCPUAllocator baseAllocator_;
};

CUDAContext

最后我们看到了CUDAContext,它是实现GPU诸多底层操作支持的主力。可以看得出来Caffe2里面的很多interfaces都是为了满足CUDA编程的需要而加的像SwitchToDevice/FinishDeviceComputation等等。当然它也提了许多自己的utility 函数,而且多是static函数,用于满足公共的GPU资源的查询需要,因此大多需要使用Mutext来保证线性访问。

class CAFFE2_API CUDAContext final : public BaseContext {
 public:
  // The default cuda context constructor.
  explicit CUDAContext(const int gpu_id = -1);
  explicit CUDAContext(const DeviceOption& option);

  ~CUDAContext() override {
    if (curand_generator_) {
      CURAND_CHECK(curandDestroyGenerator(curand_generator_));
    }
    FinishDeviceComputation();
  }
  inline void SwitchToDevice(int stream_id) override {
    set_stream_id(stream_id);
    CaffeCudaSetDevice(gpu_id_);
  }
  void FinishDeviceComputation() override {
    cudaStreamSynchronize(cuda_objects_.GetStream(gpu_id_, stream_id_));
    cudaError_t error = cudaGetLastError();
    if (error != cudaSuccess) {
      CAFFE_THROW("Encountered CUDA error: ", cudaGetErrorString(error));
    }
  }
  ..........
  .........
  static std::vector<long> TotalMemoryByGpu();
  static std::vector<long> MaxMemoryByGpu();
  static bool IsStreamFree(const DeviceOption& option, int stream_id) {
    auto stream = CUDAContext::cuda_stream(option.cuda_gpu_id(), stream_id);
    return cudaStreamQuery(stream) == cudaSuccess;
  }
............
............
 protected:
  void set_stream_id(int stream_id) {
    stream_id_ = stream_id;
  }

  int gpu_id_;
  int stream_id_ = 0;
  int random_seed_;
  curandGenerator_t curand_generator_{nullptr};
  static thread_local ThreadLocalCUDAObjects cuda_objects_;
};

参考文献

上一篇 下一篇

猜你喜欢

热点阅读