#include<taskflow/taskflow.hpp>// Taskflow is header-only
intmain(){// 启动执行器
tf::Executorexecutor;// 定义任务流
tf::Taskflowtaskflow;auto[A,B,C,D]=taskflow.emplace(// create four tasks
[](){std::cout<<"TaskA\n";},[](){std::cout<<"TaskB\n";},[](){std::cout<<"TaskC\n";},[](){std::cout<<"TaskD\n";});A.precede(B,C);// A runs before B and C
D.succeed(B,C);// D runs after B and C
// 任务执行器启动,并执行任务流
executor.run(taskflow).wait();return0;}
tf::Taskflowtaskflow;tf::Executorexecutor;tf::TaskA=taskflow.emplace([](){}).name("A");// static task A
tf::TaskC=taskflow.emplace([](){}).name("C");// static task C
tf::TaskD=taskflow.emplace([](){}).name("D");// static task D
tf::TaskB=taskflow.emplace([](tf::Subflow&subflow){tf::TaskB1=subflow.emplace([](){}).name("B1");// dynamic task B1
tf::TaskB2=subflow.emplace([](){}).name("B2");// dynamic task B2
tf::TaskB3=subflow.emplace([](){}).name("B3");// dynamic task B3
B1.precede(B3);// B1 runs bofore B3
B2.precede(B3);// B2 runs before B3
}).name("B");A.precede(B);// B runs after A
A.precede(C);// C runs after A
B.precede(D);// D runs after B
C.precede(D);// D runs after C
executor.run(taskflow).get();// execute the graph to spawn the subflow
taskflow.dump(std::cout);// dump the taskflow to a DOT format
工作流
graph LR;
subgraph SubFlow:B
B1 --> B3
B2 --> B3
B3 --> B
end
A --> C --> D
A --> B --> D
graph LR;
subgraph SubFlow:B
B1 --> B3
B2 --> B3
B3 --> B
end
A --> C --> D
A --> B --> D
tf::Executorexecutor;// main thread creates a dependent async task A
tf::AsyncTaskA=executor.silent_dependent_async([](){});// spawn a new thread to create an async task B that runs after A
std::threadt1([&](){tf::AsyncTaskB=executor.silent_dependent_async([](){},A);});// spawn a new thread to create an async task C that runs after A
std::threadt2([&](){tf::AsyncTaskC=executor.silent_dependent_async([](){},A);});executor.wait_for_all();t1.join();t2.join();
tf::TaskA,B,C,D;std::tie(A,B,C,D)=taskflow.emplace([](){return0;},[&C](tf::Runtime&rt){// C must be captured by reference
std::cout<<"B\n";rt.schedule(C);// b 唤起C
},[](){std::cout<<"C\n";},[](){std::cout<<"D\n";});A.precede(B,C,D);executor.run(taskflow).wait();
// N 是需要启动的执行线程的数量
inlineExecutor::Executor(size_tN,std::shared_ptr<WorkerInterface>wix):_MAX_STEALS{((N+1)<<1)},_threads{N},_workers{N},_notifier{N},_worker_interface{std::move(wix)}{if(N==0){TF_THROW("no cpu workers to execute taskflows");}_spawn(N);// instantite the default observer if requested
if(has_env(TF_ENABLE_PROFILER)){TFProfManager::get()._manage(make_observer<TFProfObserver>());}}
// 循环获取task
inlinevoidExecutor::_exploit_task(Worker&w,Node*&t){while(t){_invoke(w,t);// 每个worker 中有一个 _wsq 保存task列表
t=w._wsq.pop();}}// 处理task的过程
inlinevoidExecutor::_invoke(Worker&worker,Node*node){// synchronize all outstanding memory operations caused by reordering
while(!(node->_state.load(std::memory_order_acquire)&Node::READY));begin_invoke:SmallVector<int>conds;// 取消直接返回
if(node->_is_cancelled()){if(node=_tear_down_invoke(worker,node);node){gotoinvoke_successors;}return;}// if acquiring semaphore(s) exists, acquire them first
if(node->_semaphores&&!node->_semaphores->to_acquire.empty()){SmallVector<Node*>nodes;if(!node->_acquire_all(nodes)){_schedule(worker,nodes);return;}node->_state.fetch_or(Node::ACQUIRED,std::memory_order_release);}// 基于不同任务类型,执行node上的任务, conds是返回值
switch(node->_handle.index()){// static task
caseNode::STATIC:{_invoke_static_task(worker,node);}break;// dynamic task
caseNode::DYNAMIC:{_invoke_dynamic_task(worker,node);}break;......}invoke_successors:// if releasing semaphores exist, release them
if(node->_semaphores&&!node->_semaphores->to_release.empty()){_schedule(worker,node->_release_all());}// Reset the join counter to support the cyclic control flow.
// + We must do this before scheduling the successors to avoid race
// condition on _dependents.
// + We must use fetch_add instead of direct assigning
// because the user-space call on "invoke" may explicitly schedule
// this task again (e.g., pipeline) which can access the join_counter.
if((node->_state.load(std::memory_order_relaxed)&Node::CONDITIONED)){node->_join_counter.fetch_add(node->num_strong_dependents(),std::memory_order_relaxed);}else{node->_join_counter.fetch_add(node->num_dependents(),std::memory_order_relaxed);}// acquire the parent flow counter
auto&j=(node->_parent)?node->_parent->_join_counter:node->_topology->_join_counter;// Here, we want to cache the latest successor with the highest priority
worker._cache=nullptr;automax_p=static_cast<unsigned>(TaskPriority::MAX);// Invoke the task based on the corresponding type
switch(node->_handle.index()){// condition and multi-condition tasks
caseNode::CONDITION:caseNode::MULTI_CONDITION:{for(autocond:conds){if(cond>=0&&static_cast<size_t>(cond)<node->_successors.size()){autos=node->_successors[cond];/}worker._cache=s;max_p=s->_priority;}else{_schedule(worker,s);}}}}break;// 非条件的任务,即全部是强依赖,则
default:{for(size_ti=0;i<node->_successors.size();++i){if(autos=node->_successors[i];s->_join_counter.fetch_sub(1,std::memory_order_acq_rel)==1){j.fetch_add(1,std::memory_order_relaxed);// 优先级最高的自己的本worker直接执行,否则会进入 _wsq 队列等待执行
if(s->_priority<=max_p){if(worker._cache){_schedule(worker,worker._cache);}worker._cache=s;max_p=s->_priority;}else{_schedule(worker,s);}}}}break;}// 通知睡觉的worker,可以进行窃取task了
_tear_down_invoke(worker,node);// 指定当前执行node,开始执行新的worker
if(worker._cache){node=worker._cache;gotobegin_invoke;}}
// Procedure: _schedule
inlinevoidExecutor::_schedule(Worker&worker,Node*node){// We need to fetch p before the release such that the read
// operation is synchronized properly with other thread to
// void data race.
autop=node->_priority;node->_state.fetch_or(Node::READY,std::memory_order_release);// 本地worker队列
if(worker._executor==this){// 推入worker 的wsq
worker._wsq.push(node,p);// 通知等待的worker
_notifier.notify(false);return;}// 全局队列
{std::lock_guard<std::mutex>lock(_wsq_mutex);_wsq.push(node,p);}// 通知等待的worker
_notifier.notify(false);}