TaskFlow DAG部分源码阅读

2020-08-28  本文已影响0人  JimmyPan

Taskflow有很多功能,例如动态时构建子图,条件节点,并行运算,静态图。这次学习着重于Taskflow对于静态图DAG的调度

TaskFlow 核心结构

DAG图:


  Class Node{
    public:...
    private:
    PassiveVector<Node*> _successors; //保存所有的后继节点
    PassiveVector<Node*> _dependents;// 保存所有的前趋节点
    std::atomic<size_t> _join_counter {0};  // 维护在运行中,当前结点还剩几个依赖没有完成
}
class Graph {
  private:
    std::vector<Node*> _nodes; //保存所有的节点
};
class Topology { 
  private:
    Taskflow& _taskflow;
    std::promise<void> _promise; // 该拓扑结构的promise,返回的future从这里获得
    PassiveVector<Node*> _sources; // 存储一个图中所有入度为0的节点
    std::function<bool()> _pred; //结束条件 返回bool  ,返回false时,重新被调用
    std::function<void()> _call; // 结束后调用
    std::atomic<size_t> _join_counter {0}; //维护在运行中,当前拓扑中还剩入度为0的节点没有完成,当该值到0时,则终止任务
};
class Taskflow : public FlowBuilder {
  private:
    std::string _name;
    Graph _graph; //DAG图
    std::list<Topology> _topologies; //在run中创建,例如当run一个taskflow多次,一个taskflow可能会有多个Topology,全部的Topology都运行完,才会结束
};

调度器


class Executor{
    const size_t _VICTIM_BEG;//vicitm(被偷取的线程) 
    const size_t _VICTIM_END;// 这两个标志,用来表示被偷取线程的范围
    const size_t _MAX_STEALS; //最多尝试偷取多次后调用 yield
    const size_t _MAX_YIELDS;//最多让出多少次线程后 返回
   
    std::condition_variable _topology_cv; //该cv在析构函数中wait,任务完成时进行通知
    std::mutex _topology_mutex; //和cv一起用
    std::mutex _wsq_mutex; // 偷取队列,push的时候需要加锁保护?

    size_t _num_topologies {0}; //计数器,run的时候+1,可能有多个任务,到0时才会析构executor
    
    std::vector<Worker> _workers; // 维护worker生命周期,工作线程的local_thread维护其中worker的指针
    std::vector<std::thread> _threads; //创建线程

    Notifier _notifier[NUM_DOMAINS]; // 用于做cv通知

    TaskQueue<Node*> _wsq[NUM_DOMAINS]; //共享队列

    size_t _id_offset[NUM_DOMAINS] = {0};

    std::atomic<size_t> _num_actives[NUM_DOMAINS]; //偷窃态的线程数 (正在尝试偷取其它任务的线程)
    std::atomic<size_t> _num_thieves[NUM_DOMAINS]; //激活态的线程数(正在执行节点中任务的线程)
    std::atomic<bool>   _done {0}; // 标志任务是否已经完成
    std::unordered_set<std::shared_ptr<ObserverInterface>> _observers; //任务开始前后有回调函数,记录任务执行进度

}

工作线程的本地thread_local


  struct Worker {
    size_t id;
    size_t vtm; // 受害者,表示偷的是谁
    Domain domain;
    Executor* executor;
    Notifier::Waiter* waiter; //用作cv通知和阻塞
    std::mt19937 rdgen { std::random_device{}() }; //偷取时利用这随机选择
    TaskQueue<Node*> wsq[NUM_DOMAINS]; //本地的偷窃队列
  };

拓扑图调度流程


以一个例子做演示一个DAG的调度流程,如下图,taskflow维护了两个计数器
1.每个任务节点维护一个join_counter,表示该节点还有几个依赖未完成(入度为0)。
2.拓扑结构维护一个join_counter,表示该拓扑图还是有几个入度为0的节点

  1. 调度器创建拓扑结构,并将入度为0的节点(A,D)加入到队列中
  2. 线程中某个线程抢到A,执行A,并将它的两个后继节点(B,C)的join_counter减1,查看A后继节点,判断是否有join_counter为0的节点,并加入到队列中(B的join_counter为0,加入队列), C的join_counter不为0,则不管C
  3. 某个线程在A运行的同时抢到D,并将它的两个后继节点(C)的join_counter减1,此时C的Join_counter为0,则将C加入队列。(哪个工作线程最后一个把C的join_counter减到0,就由谁把C加入队列)
  4. B,C被在队列中被线程池中的线程抢到并执行
  5. 执行B,C的线程中,哪个线程最后一个运行完,谁把E的join_counter减到0,谁会把E加入到任务队列
  6. 某个线程抢到E,并把E运行完,此时整个拓扑图没有入度为0的几点存在了,结束运行


    image.png

线程池中任务的分配和偷取

核心代码分析

调度器线程通过将初始入度为0的节点放入shared_queue,并唤醒任务来开启整个任务流程
topology 中保存所有入度为0的节点,并保存着整个拓扑结构中当前入度为0的节点数,来控制流程的结束。还保存着拓扑图promise,返回给用户future

template <typename P, typename C>
std::future<void> Executor::run_until(Taskflow& taskflow) {

    std::future<void> future;
    {
        lock(mutex);
        taskflow._topologies.emplace_back(taskflow);//创建一个topology
        tpg = &(f._topologies.back()); //获得刚创建的topology
        future = tpg->_promise.get_future(); //获得topology中promise的future当返回值
    }
    //设置tgp,主要找所有入度为0的点放入topology的soucre,并且为图的所有节点设置join_counter
    for(auto node : taskflow->graph->nodes ){
        if(node->dependts() == 0 ){ //如果入度为0
            tgp->source.push_back()/node;
        }
        node->set_join_counter();//设置每个节点的join_counter,作用如前文所述
    }
    //将上面找出的所有入度为0的节点,加入到队列中
     shared_queue.push_back(tgp->sourece);


    notfiy_n(tgp->source.size()) //唤醒对应个数的工作线程
    return future;
}

工作线程负责任务的执行和获取。taskflow有一个设计目标就是,要保持一个当一个线程激活时的时候,有个一个线程正在偷取,因为激活的线程可能会添加新的任务到队列中,需要一个线程去偷取来减少任务等待。

为了达到这个目标,taskflow设计了两个状态,进入exploit_task的线程,是激活态(active),负责进行任务的运行和调度新任务。进入wait_for_task的线程,是偷窃态(thief),负责偷取任务。
之后设计了用了一种自适应策略:当最后一个偷窃态的线程编程激活状态时,会唤醒一个新的线程来取代自己偷窃线程的角色,

worker_thread([](worker* w){
while(true){
  expliot_task(w,t); //线程进入激活态,运行任务并将后继加入队列
  if(wait_for_task(w,t)) == false)//线程进入偷窃态,不停的去偷取任任务
       then break;
}
}

1.exploit_task.进入激活态,运行当前任务并且添加当前任务的后继:

void Executor::_exploit_task(Worker& w, Node*& t) {
    if(t) {  //如果任务存在
        if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) { //active + 1 表示激活态的线程 + 1 ,如果有一个激活态线程,并且目前没有偷窃态线程,要唤醒一个线程去偷窃
            _notifier.notify_one; //激活一个睡眠线程,去偷取可能会到来的任务
        }
        while(t) {
            _invoke(w, t);// 运行任务,调用t中的回调函数,并且如果后继的话在本地队列继续添加新任务
            t = local_taskqueue.pop(); // 该线程自己从自己的local queue中拿,其它线程也会去偷取。自己拿完或者被偷完时,结束循环
        }
        --_num_actives; // 离开激活态,active-1
    }
}

2.wait_for_task.进入偷取态,尝试去偷取任务

bool Executor::_wait_for_task(Worker& worker, Node*& t) {
    wait_for_task:

    ++_num_thieves[d];

    explore_task:

    _explore_task(worker, t);

    if(t) {
        if(_num_thieves.fetch_sub(1) == 1) {
            _notifier.notify(false);
        }
        return true;
    }

    _notifier[d].prepare_wait(worker.waiter);
    
    if(!shared_queue.empty()) {
        _notifier[d].cancel_wait(worker.waiter);
        t = shared_queue.steal();  
        if(t) {
            if(_num_thieves.fetch_sub(1) == 1) {
                _notifier.notify(false);
            }
            return true;
        }
        else {
            goto explore_task;
        }
    }
    if(_done) {
        _notifier.cancel_wait(worker.waiter);
        for(int i=0; i<NUM_DOMAINS; ++i) {
            _notifier.notify(true);
        }
        --_num_thieves;
        return false;
    }

    if(_num_thieves[d].fetch_sub(1) == 1) {
        if(_num_actives) {
            _notifier.cancel_wait(worker.waiter);
            goto wait_for_task;
        }
        // check all domain queue again
        for(auto& w : _workers) {
            if(!w.local_queue.empty()) {
                worker.vtm = w.id;
                _notifier[d].cancel_wait(worker.waiter);
                goto wait_for_task;
            }
        }
    }
    
    _notifier[d].commit_wait(worker.waiter);

    return true;
}


当工作线程完成了本地队列中全部的任务(exploit_task),接下来就会去运行wait_for_task。
首先该工作线程会进入explore_task去尝试偷取。当这个工作线程偷到了一个任务,并且当前线程是最后一个正在尝试偷取的线程,它会去唤醒一个新的工作线程去取代它自己去偷取新任务,并让自己返回激活状态(运行任务),否则的话它会去成为一个睡眠线程的候选者(尚未真正睡眠,如果没有任务的话,并且不是最后一个偷窃态的线程,它会真正睡眠)

然而必须避免,没有充分利用cpu的并行(underutilized paralleism)的情况,因为任务可能会在我们把工作线程睡眠时到来,所以用2PC(两阶段提交)调整活动工作线程的数量以调整可用任务的并行性。尽量减少无用的睡眠动作。

运行态后工作线程已经用将本地队列中的任务运行干净,并且投入了很多精力在偷取线程上。任务队列大概率已经是空的了,我们用prepare_wait将它变为一个候选的睡眠线程,从现在开始,所有来自其它线程通知对这个候选睡眠线程都是可见的了。

首先我们再次检查共享队列,因为其它线程可能会在prepare_wait后再次插入新任务,如果共享队列不为空,则此刻就不能睡觉了,立即偷取并返回。
接着,如果该线程是最后一个偷窃态线程,它只有在两种情况下能逃脱睡眠的命运,1.当前还有其它运行态的线程(说明可能会有新任务被加入到本地队列,该线程还不能休息,要继续努力偷取) 2.检查其它线程的队列不为空(不为空说明还有任务,还不能睡觉,必须要去偷了)
如果以上检查都没能阻拦它去阻塞,那它就真的要去阻塞了。

2.explore_task


inline void Executor::_explore_task(Worker& w, Node*& t) {
  
  //assert(_workers[w].wsq.empty());
  assert(!t);

  const auto d = w.domain;

  size_t num_steals = 0;
  size_t num_yields = 0;

  std::uniform_int_distribution<size_t> rdvtm(_VICTIM_BEG, _VICTIM_END); //随机产生受害者(被偷取的线程)
  do {
    t = (w.id == w.vtm) ? _wsq[d].steal() : _workers[w.vtm].wsq[d].steal();  // 如果被偷取者的id等于自己的id则从,shared_queue中偷取,否则从对应线程的local_queue中偷取
    if(t) {
      break;
    }    
    if(num_steals++ > _MAX_STEALS) { //如果偷了8次都没偷到,就让出自己的时间片,让其它任务先执行
      std::this_thread::yield();
      if(num_yields++ > _MAX_YIELDS) { // 如果让出了100次,还没有偷到任务,则停止循环
        break;
      }
    }    
    w.vtm = rdvtm(w.rdgen);
  } while(!_done);

}

在explore_task中,偷窃态的工作线程随机从所有的工作线程中选择一个进行偷取,如果被选到的使它自己,则从shared_queue中进行偷取。
在多个线程争抢中,偷取可能失败,所以用两个参数来控制,一直偷取还是让出cpu资源。
如果偷到了max_steal次还是没有偷到,则让出自己的cpu资源,等待cpu下次调度
如果已经让了max_yield次,则不再进行偷取,返回函数

Work steal Queue
上一篇下一篇

猜你喜欢

热点阅读