TaskFlow DAG部分源码阅读
Taskflow有很多功能,例如动态时构建子图,条件节点,并行运算,静态图。这次学习着重于Taskflow对于静态图DAG的调度
TaskFlow 核心结构
DAG图:
Class Node{
public:...
private:
PassiveVector<Node*> _successors; //保存所有的后继节点
PassiveVector<Node*> _dependents;// 保存所有的前趋节点
std::atomic<size_t> _join_counter {0}; // 维护在运行中,当前结点还剩几个依赖没有完成
}
class Graph {
private:
std::vector<Node*> _nodes; //保存所有的节点
};
class Topology {
private:
Taskflow& _taskflow;
std::promise<void> _promise; // 该拓扑结构的promise,返回的future从这里获得
PassiveVector<Node*> _sources; // 存储一个图中所有入度为0的节点
std::function<bool()> _pred; //结束条件 返回bool ,返回false时,重新被调用
std::function<void()> _call; // 结束后调用
std::atomic<size_t> _join_counter {0}; //维护在运行中,当前拓扑中还剩入度为0的节点没有完成,当该值到0时,则终止任务
};
class Taskflow : public FlowBuilder {
private:
std::string _name;
Graph _graph; //DAG图
std::list<Topology> _topologies; //在run中创建,例如当run一个taskflow多次,一个taskflow可能会有多个Topology,全部的Topology都运行完,才会结束
};
调度器
class Executor{
const size_t _VICTIM_BEG;//vicitm(被偷取的线程)
const size_t _VICTIM_END;// 这两个标志,用来表示被偷取线程的范围
const size_t _MAX_STEALS; //最多尝试偷取多次后调用 yield
const size_t _MAX_YIELDS;//最多让出多少次线程后 返回
std::condition_variable _topology_cv; //该cv在析构函数中wait,任务完成时进行通知
std::mutex _topology_mutex; //和cv一起用
std::mutex _wsq_mutex; // 偷取队列,push的时候需要加锁保护?
size_t _num_topologies {0}; //计数器,run的时候+1,可能有多个任务,到0时才会析构executor
std::vector<Worker> _workers; // 维护worker生命周期,工作线程的local_thread维护其中worker的指针
std::vector<std::thread> _threads; //创建线程
Notifier _notifier[NUM_DOMAINS]; // 用于做cv通知
TaskQueue<Node*> _wsq[NUM_DOMAINS]; //共享队列
size_t _id_offset[NUM_DOMAINS] = {0};
std::atomic<size_t> _num_actives[NUM_DOMAINS]; //偷窃态的线程数 (正在尝试偷取其它任务的线程)
std::atomic<size_t> _num_thieves[NUM_DOMAINS]; //激活态的线程数(正在执行节点中任务的线程)
std::atomic<bool> _done {0}; // 标志任务是否已经完成
std::unordered_set<std::shared_ptr<ObserverInterface>> _observers; //任务开始前后有回调函数,记录任务执行进度
}
工作线程的本地thread_local
struct Worker {
size_t id;
size_t vtm; // 受害者,表示偷的是谁
Domain domain;
Executor* executor;
Notifier::Waiter* waiter; //用作cv通知和阻塞
std::mt19937 rdgen { std::random_device{}() }; //偷取时利用这随机选择
TaskQueue<Node*> wsq[NUM_DOMAINS]; //本地的偷窃队列
};
拓扑图调度流程
以一个例子做演示一个DAG的调度流程,如下图,taskflow维护了两个计数器
1.每个任务节点维护一个join_counter,表示该节点还有几个依赖未完成(入度为0)。
2.拓扑结构维护一个join_counter,表示该拓扑图还是有几个入度为0的节点
- 调度器创建拓扑结构,并将入度为0的节点(A,D)加入到队列中
- 线程中某个线程抢到A,执行A,并将它的两个后继节点(B,C)的join_counter减1,查看A后继节点,判断是否有join_counter为0的节点,并加入到队列中(B的join_counter为0,加入队列), C的join_counter不为0,则不管C
- 某个线程在A运行的同时抢到D,并将它的两个后继节点(C)的join_counter减1,此时C的Join_counter为0,则将C加入队列。(哪个工作线程最后一个把C的join_counter减到0,就由谁把C加入队列)
- B,C被在队列中被线程池中的线程抢到并执行
- 执行B,C的线程中,哪个线程最后一个运行完,谁把E的join_counter减到0,谁会把E加入到任务队列
-
某个线程抢到E,并把E运行完,此时整个拓扑图没有入度为0的几点存在了,结束运行
image.png
线程池中任务的分配和偷取
核心代码分析
-
调度器添加graph到任务队列中:
调度器线程通过将初始入度为0的节点放入shared_queue,并唤醒任务来开启整个任务流程
topology 中保存所有入度为0的节点,并保存着整个拓扑结构中当前入度为0的节点数,来控制流程的结束。还保存着拓扑图promise,返回给用户future
template <typename P, typename C>
std::future<void> Executor::run_until(Taskflow& taskflow) {
std::future<void> future;
{
lock(mutex);
taskflow._topologies.emplace_back(taskflow);//创建一个topology
tpg = &(f._topologies.back()); //获得刚创建的topology
future = tpg->_promise.get_future(); //获得topology中promise的future当返回值
}
//设置tgp,主要找所有入度为0的点放入topology的soucre,并且为图的所有节点设置join_counter
for(auto node : taskflow->graph->nodes ){
if(node->dependts() == 0 ){ //如果入度为0
tgp->source.push_back()/node;
}
node->set_join_counter();//设置每个节点的join_counter,作用如前文所述
}
//将上面找出的所有入度为0的节点,加入到队列中
shared_queue.push_back(tgp->sourece);
notfiy_n(tgp->source.size()) //唤醒对应个数的工作线程
return future;
}
-
工作线程运行循环:
工作线程负责任务的执行和获取。taskflow有一个设计目标就是,要保持一个当一个线程激活时的时候,有个一个线程正在偷取,因为激活的线程可能会添加新的任务到队列中,需要一个线程去偷取来减少任务等待。
为了达到这个目标,taskflow设计了两个状态,进入exploit_task的线程,是激活态(active),负责进行任务的运行和调度新任务。进入wait_for_task的线程,是偷窃态(thief),负责偷取任务。
之后设计了用了一种自适应策略:当最后一个偷窃态的线程编程激活状态时,会唤醒一个新的线程来取代自己偷窃线程的角色,
worker_thread([](worker* w){
while(true){
expliot_task(w,t); //线程进入激活态,运行任务并将后继加入队列
if(wait_for_task(w,t)) == false)//线程进入偷窃态,不停的去偷取任任务
then break;
}
}
1.exploit_task.进入激活态,运行当前任务并且添加当前任务的后继:
void Executor::_exploit_task(Worker& w, Node*& t) {
if(t) { //如果任务存在
if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) { //active + 1 表示激活态的线程 + 1 ,如果有一个激活态线程,并且目前没有偷窃态线程,要唤醒一个线程去偷窃
_notifier.notify_one; //激活一个睡眠线程,去偷取可能会到来的任务
}
while(t) {
_invoke(w, t);// 运行任务,调用t中的回调函数,并且如果后继的话在本地队列继续添加新任务
t = local_taskqueue.pop(); // 该线程自己从自己的local queue中拿,其它线程也会去偷取。自己拿完或者被偷完时,结束循环
}
--_num_actives; // 离开激活态,active-1
}
}
2.wait_for_task.进入偷取态,尝试去偷取任务
bool Executor::_wait_for_task(Worker& worker, Node*& t) {
wait_for_task:
++_num_thieves[d];
explore_task:
_explore_task(worker, t);
if(t) {
if(_num_thieves.fetch_sub(1) == 1) {
_notifier.notify(false);
}
return true;
}
_notifier[d].prepare_wait(worker.waiter);
if(!shared_queue.empty()) {
_notifier[d].cancel_wait(worker.waiter);
t = shared_queue.steal();
if(t) {
if(_num_thieves.fetch_sub(1) == 1) {
_notifier.notify(false);
}
return true;
}
else {
goto explore_task;
}
}
if(_done) {
_notifier.cancel_wait(worker.waiter);
for(int i=0; i<NUM_DOMAINS; ++i) {
_notifier.notify(true);
}
--_num_thieves;
return false;
}
if(_num_thieves[d].fetch_sub(1) == 1) {
if(_num_actives) {
_notifier.cancel_wait(worker.waiter);
goto wait_for_task;
}
// check all domain queue again
for(auto& w : _workers) {
if(!w.local_queue.empty()) {
worker.vtm = w.id;
_notifier[d].cancel_wait(worker.waiter);
goto wait_for_task;
}
}
}
_notifier[d].commit_wait(worker.waiter);
return true;
}
当工作线程完成了本地队列中全部的任务(exploit_task),接下来就会去运行wait_for_task。
首先该工作线程会进入explore_task去尝试偷取。当这个工作线程偷到了一个任务,并且当前线程是最后一个正在尝试偷取的线程,它会去唤醒一个新的工作线程去取代它自己去偷取新任务,并让自己返回激活状态(运行任务),否则的话它会去成为一个睡眠线程的候选者(尚未真正睡眠,如果没有任务的话,并且不是最后一个偷窃态的线程,它会真正睡眠)
然而必须避免,没有充分利用cpu的并行(underutilized paralleism)的情况,因为任务可能会在我们把工作线程睡眠时到来,所以用2PC(两阶段提交)调整活动工作线程的数量以调整可用任务的并行性。尽量减少无用的睡眠动作。
运行态后工作线程已经用将本地队列中的任务运行干净,并且投入了很多精力在偷取线程上。任务队列大概率已经是空的了,我们用prepare_wait将它变为一个候选的睡眠线程,从现在开始,所有来自其它线程通知对这个候选睡眠线程都是可见的了。
首先我们再次检查共享队列,因为其它线程可能会在prepare_wait后再次插入新任务,如果共享队列不为空,则此刻就不能睡觉了,立即偷取并返回。
接着,如果该线程是最后一个偷窃态线程,它只有在两种情况下能逃脱睡眠的命运,1.当前还有其它运行态的线程(说明可能会有新任务被加入到本地队列,该线程还不能休息,要继续努力偷取) 2.检查其它线程的队列不为空(不为空说明还有任务,还不能睡觉,必须要去偷了)
如果以上检查都没能阻拦它去阻塞,那它就真的要去阻塞了。
2.explore_task
inline void Executor::_explore_task(Worker& w, Node*& t) {
//assert(_workers[w].wsq.empty());
assert(!t);
const auto d = w.domain;
size_t num_steals = 0;
size_t num_yields = 0;
std::uniform_int_distribution<size_t> rdvtm(_VICTIM_BEG, _VICTIM_END); //随机产生受害者(被偷取的线程)
do {
t = (w.id == w.vtm) ? _wsq[d].steal() : _workers[w.vtm].wsq[d].steal(); // 如果被偷取者的id等于自己的id则从,shared_queue中偷取,否则从对应线程的local_queue中偷取
if(t) {
break;
}
if(num_steals++ > _MAX_STEALS) { //如果偷了8次都没偷到,就让出自己的时间片,让其它任务先执行
std::this_thread::yield();
if(num_yields++ > _MAX_YIELDS) { // 如果让出了100次,还没有偷到任务,则停止循环
break;
}
}
w.vtm = rdvtm(w.rdgen);
} while(!_done);
}
在explore_task中,偷窃态的工作线程随机从所有的工作线程中选择一个进行偷取,如果被选到的使它自己,则从shared_queue中进行偷取。
在多个线程争抢中,偷取可能失败,所以用两个参数来控制,一直偷取还是让出cpu资源。
如果偷到了max_steal次还是没有偷到,则让出自己的cpu资源,等待cpu下次调度
如果已经让了max_yield次,则不再进行偷取,返回函数