caffe--源码阅读笔记2
本文主要caffe如何采用多GPU进行训练。
1. Caffe启用多GPU
caffe源码中使用宏USE_NCCL进行区分是否启用多GPU训练
使用时的代码如下:
shared_ptr<caffe::Solver<float>> solver(caffe::SolverRegistry<float>::CreateSolver(solver_param));
caffe::NCCL<float> nccl(solver);
nccl.Run(gpus, FLAGS_snapshot.size() > 0 ? FLAGS_snapshot.c_str() : NULL);
关于第一句相关解释,请查看本系列文章的第一篇。
1.1 NCCL相关定义
关于NCCL的定义见下方:
template<typename Dtype>
class NCCL : public GPUParams<Dtype>, public Solver<Dtype>::Callback, public Net<Dtype>::Callback {
public:
/*Single process version. */
explicit NCCL(shared_ptr<Solver<Dtype> > solver);
/*In multi-process settings, first create a NCCL id (new_uid), then
pass it to each process to create connected instances.*/
NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid);
/*In single process settings, create instances without uids and call this to connect them.*/
static void InitSingleProcess(vector<NCCL<Dtype>*>* nccls);
/* Broadcast weights from rank 0 other solvers.*/
void Broadcast();
/*Single process multi-GPU.*/
void Run(const vector<int>& gpus, const char* restore);
protected:
void Init();
void on_start() {}
void run(int layer); // Net callback
void on_gradients_ready();
};
而其父类有说三个,一个是保存参数的(大小,数据,梯度);第二个是Solver层面的回调,用于Solver层面操作;第三个是Net层面的回调,用于Net层面操作。具体定义如下:
template<typename Dtype>
class GPUParams : public Params<Dtype> {
public:
GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device);
void Configure(Solver<Dtype>* solver) const;
protected:
using Params<Dtype>::size_;
using Params<Dtype>::data_;
using Params<Dtype>::diff_;
};
Solver:
class Callback {
protected:
virtual void on_start() = 0;
virtual void on_gradients_ready() = 0;
template <typename T>
friend class Solver;
};
Net:
class Callback {
protected:
virtual void run(int layer) = 0;
template <typename T>
friend class Net;
};
1.2 NCCL初始化
对于NCCL的初始化,主要有三部,(1)数据初始化,(2)数据更新,(3)创建Cuda流,具体见下:
template<typename Dtype>
NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver)
: GPUParams<Dtype>(solver, getDevice()),
comm_(), solver_(solver), barrier_() {
this->Configure(solver.get());
Init();
}
而GPUParams的初始化主要是拷贝数据和梯度,具体代码见下:
// Copy blob values
const vector<Blob<Dtype>*>& net = root_solver->net()->learnable_params();
apply_buffers(net, data_, size_, copy);
caffe_gpu_set(size_, Dtype(0), diff_);
而configure主要是将网络的数据和梯度进行替换更新,具体代码见下:
const vector<Blob<Dtype>*>& net = solver->net()->learnable_params();
apply_buffers(net, data_, size_, replace_gpu);
apply_buffers(net, diff_, size_, replace_gpu_diff);
第三个是创建流,具体如下:
if (solver_->param().layer_wise_reduce()) {
CUDA_CHECK(cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking));
}
1.3 NCCL运行
主进程在运行时,根据gpu的个数(N+1),开启N个线程。每个线程分别在第1~N+1序号的GPU在做具体的任务,主进程在等待直到所有的进程完成任务。
然后,在GPU0上进行一次其他线程训练后的数据合并(其他线程调用其他流更改数据)和并进行训练,完成之后进行线程资源的回收。
template<typename Dtype>
void NCCL<Dtype>::Run(const vector<int>& gpus, const char* restore) {
boost::barrier barrier(static_cast<int>(gpus.size()));
vector<NCCL<Dtype>*> nccls(gpus.size());
// Create workers
vector<shared_ptr<Worker<Dtype>>> workers(gpus.size());
for (int i = 1; i < gpus.size(); ++i) {
CUDA_CHECK(cudaSetDevice(gpus[i]));
Caffe::set_solver_rank(i);
Worker<Dtype>* w = new Worker<Dtype>(solver_, gpus[i], &barrier, &nccls, restore);
w->StartInternalThread();
workers[i].reset(w);
}
......
// Wait for workers
barrier.wait();
// Init NCCL
InitSingleProcess(&nccls);
barrier.wait();
// Run first solver on current thread
Broadcast();
solver_->Solve();
barrier.wait(); // Hangs without it when running tests
// Wait for shutdown
for (int i = 1; i < gpus.size(); ++i) {
workers[i]->StopInternalThread();
}
}
每个线程做的事情是先solver和net的注册回调,获取当前的数据(其他线程调用其他流更改数据)并且进行多次训练,具体代码如下:
void InternalThreadEntry() {
// Create solver and install callbacks
shared_ptr<Solver<Dtype> > s(SolverRegistry<Dtype>::CreateSolver(param));
if (restore_) { s->Restore(restore_); }
NCCL<Dtype> nccl(s);
s->add_callback(&nccl);
if (s->param().layer_wise_reduce()) { s->net()->add_after_backward(&nccl);}
(*nccls_)[Caffe::solver_rank()] = &nccl;
// Wait for other threads
barrier_->wait();
// Broadcast rank 0 state
nccl.Broadcast();
s->Step(param.max_iter() - s->iter());
barrier_->wait();
}
具体开启新的线程的代码如下:
void InternalThread::StartInternalThread() {
.......................
try {
thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
rand_seed, solver_count, solver_rank, multiprocess));
} catch (std::exception& e) {
LOG(FATAL) << "Thread exception: " << e.what();
}
..........................
}
2 关于回调
回调有2个层次,一个是Solver,一个是Net。
2.1 Solver回调
solver回调功能
我们先来看Solver层面的回调。在启动线程是会进行Solver回调注册,调用的接口是add_callback,回调有2个,一个是:
class Callback {
virtual void on_start() = 0;
virtual void on_gradients_ready() = 0;
};
在此场景下,on_start()没有处理,而第二个回调有2部分,如果启动计算和通讯是叠代进行,则需要同步各个cuda流,否则对各个梯度进行Reduce并且归一化。具体实现是:
template<typename Dtype>
void NCCL<Dtype>::on_gradients_ready() {
if (solver_->param().layer_wise_reduce()) {
// Make sure reduction is done before applying gradients
CUDA_CHECK(cudaStreamSynchronize(stream_));
} else {
if (barrier_) { // NULL in multi process case
barrier_->wait();
}
NCCL_CHECK(ncclAllReduce(diff_, diff_, static_cast<int>(size_),
nccl::dataType<Dtype>::type, ncclSum, comm_, cudaStreamDefault));
caffe_gpu_scal(static_cast<int>(size_), (Dtype) 1.0 / Caffe::solver_count(), diff_);
}
}
solver回调注册
solver的回调在两个地方注册,一个是启动子线程是进行注册,即void InternalThreadEntry()函数,另外一个是并且启动void NCCL<Dtype>::Run(...)。其实这两个地方的时机本质是一样的,就是多流训练是启动是进行注册。
solver回调的调用
solver回调的调用时机,是在训练是Step函数中,具体在上一篇文章有描述,主要是在前向推理和反向广播之前调用on_start(), 之后on_gradients_ready()。其中为何有是for循环的原因是多个GPU,会有多个同样的回调函数,也是为何on_gradients_ready()需要做流同步和diff的reduce。我们再次回顾核心下:
void Solver<Dtype>::Step(int iters) {
while (iter_ < stop_iter) {
......
for (int i = 0; i < callbacks_.size(); ++i) {
callbacks_[i]->on_start();
}
.......
for (int i = 0; i < param_.iter_size(); ++i) {
loss += net_->ForwardBackward();
}
......
for (int i = 0; i < callbacks_.size(); ++i) {
callbacks_[i]->on_gradients_ready();
}
ApplyUpdate();
......
}
}
2.2 Net层面的回调
Net回调功能
在Net类型定义中有定义回调,见下面代码:
class Callback {
protected:
virtual void run(int layer) = 0;
template <typename T>
friend class Net;
};
在并行训练中此回调被重写,其主要功能是步各个cuda流,否则对各个梯度进行Reduce并且归一化,具体见下面代码:
template<typename Dtype>
void NCCL<Dtype>::run(int layer) {
vector<shared_ptr<Blob<Dtype> > >& blobs =
solver_->net()->layers()[layer]->blobs();
if (blobs.size() > 0) {
CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
if (barrier_) { // NULL in multi process case
barrier_->wait();
}
NCCL_CHECK(ncclAllReduce(blobs[0]->mutable_gpu_diff(), blobs[0]->mutable_gpu_diff(),
size, nccl::dataType<Dtype>::type, ncclSum, comm_, stream_));
caffe_gpu_scal(size, (Dtype) 1.0 / Caffe::solver_count(), blobs[0]->mutable_gpu_diff(), stream_);
}
}
Net回调注册
Net的回调在两个地方注册,一个是启动子线程是进行注册,即void InternalThreadEntry()函数,另外一个是并且启动void NCCL<Dtype>::Run(...)。其实这两个地方的时机本质是一样的,就是多流训练是启动是进行注册。
注册的类型有before_forward、after_forward、before_backward、after_backward。 而在多GPU并行是指注册了after_backward。调用的函数是:
void add_after_backward(Callback* value) {
after_backward_.push_back(value);
}
Net回调的调用
Net注册的函数调用
Net在进行计算是,存在两个阶段,一个是forward, 一个是backward。 其中forward的前后都有回调,如前所示此两个回调为空。其调用函数见下:
template <typename Dtype>
Dtype Net<Dtype>::ForwardFromTo(int start, int end) {
for (int i = start; i <= end; ++i) {
for (int c = 0; c < before_forward_.size(); ++c) {
before_forward_[c]->run(i);
}
Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]);
loss += layer_loss;
for (int c = 0; c < after_forward_.size(); ++c) {
after_forward_[c]->run(i);
}
}
return loss;
}
其次,backward的前后都有回调,其调用函数见下:
template <typename Dtype>
void Net<Dtype>::BackwardFromTo(int start, int end) {
for (int i = start; i >= end; --i) {
for (int c = 0; c < before_backward_.size(); ++c) {
before_backward_[c]->run(i);
}
if (layer_need_backward_[i]) {
layers_[i]->Backward(top_vecs_[i], bottom_need_backward_[i], bottom_vecs_[i]);
}
for (int c = 0; c < after_backward_.size(); ++c) {
after_backward_[c]->run(i);
}
}
}
至于ForwardFromTo()和BackwardFromTo()函数的作用,请查阅上一篇文章。
综上所示,多GPU的训练其实就是采用多线程进行,其中在计算中需要合适的时机进行在cuda流进行同步。其编程模型是多线程+CUDA流。本质上与我们熟悉的编程模型没有区别。支持caffe的源码阅读完毕。