caffe--源码阅读笔记2

2019-04-26  本文已影响0人  Alfie20

本文主要caffe如何采用多GPU进行训练。

1. Caffe启用多GPU

caffe源码中使用宏USE_NCCL进行区分是否启用多GPU训练
使用时的代码如下:

shared_ptr<caffe::Solver<float>> solver(caffe::SolverRegistry<float>::CreateSolver(solver_param));
caffe::NCCL<float> nccl(solver);
nccl.Run(gpus, FLAGS_snapshot.size() > 0 ? FLAGS_snapshot.c_str() : NULL);

关于第一句相关解释,请查看本系列文章的第一篇。

1.1 NCCL相关定义

关于NCCL的定义见下方:

template<typename Dtype>
class NCCL : public GPUParams<Dtype>, public Solver<Dtype>::Callback, public Net<Dtype>::Callback {
 public:
  /*Single process version. */
  explicit NCCL(shared_ptr<Solver<Dtype> > solver);
  /*In multi-process settings, first create a NCCL id (new_uid), then
   pass it to each process to create connected instances.*/
  NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid);
  /*In single process settings, create instances without uids and call this to connect them.*/
  static void InitSingleProcess(vector<NCCL<Dtype>*>* nccls);
  /* Broadcast weights from rank 0 other solvers.*/
  void Broadcast();
  /*Single process multi-GPU.*/
  void Run(const vector<int>& gpus, const char* restore);
 protected:
  void Init();
  void on_start() {}
  void run(int layer);  // Net callback
  void on_gradients_ready();
};

而其父类有说三个,一个是保存参数的(大小,数据,梯度);第二个是Solver层面的回调,用于Solver层面操作;第三个是Net层面的回调,用于Net层面操作。具体定义如下:

template<typename Dtype>
class GPUParams : public Params<Dtype> {
 public:
  GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device);
  void Configure(Solver<Dtype>* solver) const;
 protected:
  using Params<Dtype>::size_;
  using Params<Dtype>::data_;
  using Params<Dtype>::diff_;
};

Solver:
class Callback {
   protected:
    virtual void on_start() = 0;
    virtual void on_gradients_ready() = 0;

    template <typename T>
    friend class Solver;
  };
Net:
class Callback {
  protected:
    virtual void run(int layer) = 0;
    template <typename T>
    friend class Net;
  };

1.2 NCCL初始化

对于NCCL的初始化,主要有三部,(1)数据初始化,(2)数据更新,(3)创建Cuda流,具体见下:

template<typename Dtype>
NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver)
  : GPUParams<Dtype>(solver, getDevice()),
    comm_(), solver_(solver), barrier_() {
  this->Configure(solver.get());
  Init();
}

而GPUParams的初始化主要是拷贝数据和梯度,具体代码见下:

 // Copy blob values
  const vector<Blob<Dtype>*>& net = root_solver->net()->learnable_params();
  apply_buffers(net, data_, size_, copy);
  caffe_gpu_set(size_, Dtype(0), diff_);

而configure主要是将网络的数据和梯度进行替换更新,具体代码见下:

  const vector<Blob<Dtype>*>& net = solver->net()->learnable_params();
  apply_buffers(net, data_, size_, replace_gpu);
  apply_buffers(net, diff_, size_, replace_gpu_diff);

第三个是创建流,具体如下:

  if (solver_->param().layer_wise_reduce()) {
    CUDA_CHECK(cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking));
  }

1.3 NCCL运行

主进程在运行时,根据gpu的个数(N+1),开启N个线程。每个线程分别在第1~N+1序号的GPU在做具体的任务,主进程在等待直到所有的进程完成任务。
然后,在GPU0上进行一次其他线程训练后的数据合并(其他线程调用其他流更改数据)和并进行训练,完成之后进行线程资源的回收。

template<typename Dtype>
void NCCL<Dtype>::Run(const vector<int>& gpus, const char* restore) {
  boost::barrier barrier(static_cast<int>(gpus.size()));
  vector<NCCL<Dtype>*> nccls(gpus.size());
  // Create workers
  vector<shared_ptr<Worker<Dtype>>> workers(gpus.size());
  for (int i = 1; i < gpus.size(); ++i) {
    CUDA_CHECK(cudaSetDevice(gpus[i]));
    Caffe::set_solver_rank(i);
    Worker<Dtype>* w = new Worker<Dtype>(solver_, gpus[i], &barrier, &nccls, restore);
    w->StartInternalThread();
    workers[i].reset(w);
  }
  ......
  // Wait for workers
  barrier.wait();
  // Init NCCL
  InitSingleProcess(&nccls);
  barrier.wait();
  // Run first solver on current thread
  Broadcast();
  solver_->Solve();
  barrier.wait();  // Hangs without it when running tests
  // Wait for shutdown
  for (int i = 1; i < gpus.size(); ++i) {
    workers[i]->StopInternalThread();
  }
}

每个线程做的事情是先solver和net的注册回调,获取当前的数据(其他线程调用其他流更改数据)并且进行多次训练,具体代码如下:

void InternalThreadEntry() {
    // Create solver and install callbacks
    shared_ptr<Solver<Dtype> > s(SolverRegistry<Dtype>::CreateSolver(param));
    if (restore_) { s->Restore(restore_); }
    NCCL<Dtype> nccl(s);
    s->add_callback(&nccl);
    if (s->param().layer_wise_reduce()) { s->net()->add_after_backward(&nccl);}
    (*nccls_)[Caffe::solver_rank()] = &nccl;
    // Wait for other threads
    barrier_->wait();
    // Broadcast rank 0 state
    nccl.Broadcast();
    s->Step(param.max_iter() - s->iter());
    barrier_->wait();
  }

具体开启新的线程的代码如下:

void InternalThread::StartInternalThread() {
  .......................
  try {
    thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
          rand_seed, solver_count, solver_rank, multiprocess));
  } catch (std::exception& e) {
    LOG(FATAL) << "Thread exception: " << e.what();
  }
  ..........................
}

2 关于回调

回调有2个层次,一个是Solver,一个是Net。

2.1 Solver回调
solver回调功能

我们先来看Solver层面的回调。在启动线程是会进行Solver回调注册,调用的接口是add_callback,回调有2个,一个是:

class Callback {
    virtual void on_start() = 0;
    virtual void on_gradients_ready() = 0;
};

在此场景下,on_start()没有处理,而第二个回调有2部分,如果启动计算和通讯是叠代进行,则需要同步各个cuda流,否则对各个梯度进行Reduce并且归一化。具体实现是:

template<typename Dtype>
void NCCL<Dtype>::on_gradients_ready() {
  if (solver_->param().layer_wise_reduce()) {
    // Make sure reduction is done before applying gradients
    CUDA_CHECK(cudaStreamSynchronize(stream_));
  } else {
    if (barrier_) {  // NULL in multi process case
      barrier_->wait();
    }
    NCCL_CHECK(ncclAllReduce(diff_, diff_, static_cast<int>(size_),
                             nccl::dataType<Dtype>::type, ncclSum, comm_, cudaStreamDefault));
    caffe_gpu_scal(static_cast<int>(size_), (Dtype) 1.0 / Caffe::solver_count(), diff_);
  }
}
solver回调注册

solver的回调在两个地方注册,一个是启动子线程是进行注册,即void InternalThreadEntry()函数,另外一个是并且启动void NCCL<Dtype>::Run(...)。其实这两个地方的时机本质是一样的,就是多流训练是启动是进行注册。

solver回调的调用

solver回调的调用时机,是在训练是Step函数中,具体在上一篇文章有描述,主要是在前向推理和反向广播之前调用on_start(), 之后on_gradients_ready()。其中为何有是for循环的原因是多个GPU,会有多个同样的回调函数,也是为何on_gradients_ready()需要做流同步和diff的reduce。我们再次回顾核心下:

void Solver<Dtype>::Step(int iters) {
   while (iter_ < stop_iter) {
      ......
      for (int i = 0; i < callbacks_.size(); ++i) {
        callbacks_[i]->on_start();
      }
     .......
     for (int i = 0; i < param_.iter_size(); ++i) {
      loss += net_->ForwardBackward();
     }
    ......
    for (int i = 0; i < callbacks_.size(); ++i) {
      callbacks_[i]->on_gradients_ready();
    }
    ApplyUpdate();
    ......
  }
}
2.2 Net层面的回调
Net回调功能

在Net类型定义中有定义回调,见下面代码:

  class Callback {
   protected:
    virtual void run(int layer) = 0;

    template <typename T>
    friend class Net;
  };

在并行训练中此回调被重写,其主要功能是步各个cuda流,否则对各个梯度进行Reduce并且归一化,具体见下面代码:

template<typename Dtype>
void NCCL<Dtype>::run(int layer) {
    vector<shared_ptr<Blob<Dtype> > >& blobs =
    solver_->net()->layers()[layer]->blobs();
  if (blobs.size() > 0) {
    CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
    if (barrier_) {  // NULL in multi process case
      barrier_->wait();
    }
    NCCL_CHECK(ncclAllReduce(blobs[0]->mutable_gpu_diff(), blobs[0]->mutable_gpu_diff(),
                             size, nccl::dataType<Dtype>::type, ncclSum, comm_, stream_));
    caffe_gpu_scal(size, (Dtype) 1.0 / Caffe::solver_count(), blobs[0]->mutable_gpu_diff(), stream_);
  }
}
Net回调注册

Net的回调在两个地方注册,一个是启动子线程是进行注册,即void InternalThreadEntry()函数,另外一个是并且启动void NCCL<Dtype>::Run(...)。其实这两个地方的时机本质是一样的,就是多流训练是启动是进行注册。
注册的类型有before_forward、after_forward、before_backward、after_backward。 而在多GPU并行是指注册了after_backward。调用的函数是:

  void add_after_backward(Callback* value) {
    after_backward_.push_back(value);
  }
Net回调的调用

Net注册的函数调用
Net在进行计算是,存在两个阶段,一个是forward, 一个是backward。 其中forward的前后都有回调,如前所示此两个回调为空。其调用函数见下:

template <typename Dtype>
Dtype Net<Dtype>::ForwardFromTo(int start, int end) {
  for (int i = start; i <= end; ++i) {
    for (int c = 0; c < before_forward_.size(); ++c) {
      before_forward_[c]->run(i);
    }
    Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]);
    loss += layer_loss;
       for (int c = 0; c < after_forward_.size(); ++c) {
      after_forward_[c]->run(i);
    }
  }
  return loss;
}

其次,backward的前后都有回调,其调用函数见下:

template <typename Dtype>
void Net<Dtype>::BackwardFromTo(int start, int end) {
  for (int i = start; i >= end; --i) {
    for (int c = 0; c < before_backward_.size(); ++c) {
      before_backward_[c]->run(i);
    }
    if (layer_need_backward_[i]) {
      layers_[i]->Backward(top_vecs_[i], bottom_need_backward_[i], bottom_vecs_[i]);
    }
    for (int c = 0; c < after_backward_.size(); ++c) {
      after_backward_[c]->run(i);
    }
  }
}

至于ForwardFromTo()和BackwardFromTo()函数的作用,请查阅上一篇文章。

综上所示,多GPU的训练其实就是采用多线程进行,其中在计算中需要合适的时机进行在cuda流进行同步。其编程模型是多线程+CUDA流。本质上与我们熟悉的编程模型没有区别。支持caffe的源码阅读完毕。

上一篇 下一篇

猜你喜欢

热点阅读