东营专业网站建设公司排行,网站设计哪家最好,wordpress 模块开发,网页设计与制作摘要创建Subflow
DAG任务中#xff0c;有一种常见的场景#xff0c;一个任务可能在执行期间产生新的任务#xff0c;然后紧接着执行新任务。 之前提到的静态图就没有办法实现这样一个功能了#xff0c;所以Taskflow提供了另一种流的节点#xff1a;Subflow#xff0c;Subflo…创建Subflow
DAG任务中有一种常见的场景一个任务可能在执行期间产生新的任务然后紧接着执行新任务。 之前提到的静态图就没有办法实现这样一个功能了所以Taskflow提供了另一种流的节点SubflowSubflow的API与Taskflow无异但又可以作为Taskflow的一个节点。
比如描述如下依赖图
#include memory
#include taskflow/taskflow.hpp
int main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A taskflow.emplace([] () {}).name(A); // static task Atf::Task C taskflow.emplace([] () {}).name(C); // static task Ctf::Task D taskflow.emplace([] () {}).name(D); // static task D// 通过lambda创建subflow// 开始执行的时候会创建一个subflow然后通过引用传给lambda// 只有当本subflow执行完成之后才会执行taskflowtf::Task B taskflow.emplace([] (tf::Subflow subflow) { tf::Task B1 subflow.emplace([] () {}).name(B1); // subflow task B1tf::Task B2 subflow.emplace([] () {}).name(B2); // subflow task B2tf::Task B3 subflow.emplace([] () {}).name(B3); // subflow task B3B1.precede(B3); // B1 runs bofore B3B2.precede(B3); // B2 runs before B3}).name(B);A.precede(B); // B runs after AA.precede(C); // C runs after AB.precede(D); // D runs after BC.precede(D); // D runs after Ctaskflow.dump(std::cout); // 在执行前subflow无法展开subflow只会显示节点Bexecutor.run(taskflow).get(); // execute the graph to spawn the subflowtaskflow.dump(std::cout); // 执行完毕后才可以完全展开return 0;
}在run之前dumpsubflow只会被当作普通节点 在run之后调用subflow被展开得到真正的依赖图
Join a Subflow
Subflow 在离开其上下文时默认调用join表示需要把subflow中的task执行完才完成subflow的执行。同时还可以在上下文中显式调用join来完成递归模式
#include memory
#include taskflow/taskflow.hpp// 递归计算斐波那契数列
int spswm(int n, tf::Subflow sbf) {if(n 2) return n;int res1 0, res2 0;// 生成两个递归子任务.sbf.emplace([res1, n](tf::Subflow sbf_inner){res1 spswm(n-1, sbf_inner);}).name(sub Task:_std::to_string(n-1));sbf.emplace([res2, n](tf::Subflow sbf_inner){res2 spswm(n-2, sbf_inner);}).name(sub Task:_std::to_string(n-2));// 显式调用join得到两个子任务的返回值sbf.join();return res1 res2;
}
int main() {tf::Executor executor; tf::Taskflow taskflow;int res 0; // 用于存放最后的结果taskflow.emplace([res](tf::Subflow sbf){res spswm(5, sbf); // 计算5的斐波那契数}).name(main Task);executor.run(taskflow).wait();std::cout 5的斐波那契数 res std::endl;taskflow.dump(std::cout); return 0;
}调用图如下 Detach a Subflow
和线程一样Subflow 可以Detach出去单独执行并最后被主Taskflow Join
#include taskflow/taskflow.hppint main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A taskflow.emplace([] () {}).name(A); // static task Atf::Task C taskflow.emplace([] () {}).name(C); // static task Ctf::Task D taskflow.emplace([] () {}).name(D); // static task Dtf::Task B taskflow.emplace([] (tf::Subflow subflow) { tf::Task B1 subflow.emplace([] () {}).name(B1); // static task B1tf::Task B2 subflow.emplace([] () {}).name(B2); // static task B2tf::Task B3 subflow.emplace([] () {}).name(B3); // static task B3B1.precede(B3); // B1 runs bofore B3B2.precede(B3); // B2 runs before B3subflow.detach(); // 分离出Taskflow单独执行}).name(B);A.precede(B); // B runs after AA.precede(C); // C runs after AB.precede(D); // D runs after BC.precede(D); // D runs after Cexecutor.run(taskflow).wait();taskflow.dump(std::cout); return 0;
}最终结构如下
detach出去的Subflow是临时的所以如果执行的是run_n, ABCD四个节点只会构造一次但是subflow会被构造多次
#include taskflow/taskflow.hppint main() {tf::Executor executor; tf::Taskflow taskflow;tf::Task A taskflow.emplace([] () {}).name(A); // static task Atf::Task C taskflow.emplace([] () {}).name(C); // static task Ctf::Task D taskflow.emplace([] () {}).name(D); // static task Dtf::Task B taskflow.emplace([] (tf::Subflow subflow) { tf::Task B1 subflow.emplace([] () {}).name(B1); // static task B1tf::Task B2 subflow.emplace([] () {}).name(B2); // static task B2tf::Task B3 subflow.emplace([] () {}).name(B3); // static task B3B1.precede(B3); // B1 runs bofore B3B2.precede(B3); // B2 runs before B3subflow.detach(); // 分离出Taskflow单独执行}).name(B);A.precede(B); // B runs after AA.precede(C); // C runs after AB.precede(D); // D runs after BC.precede(D); // D runs after Cexecutor.run_n(taskflow, 5).wait();assert(taskflow.num_tasks() 19);taskflow.dump(std::cout);return 0;
}嵌套子图
Subflow 支持递归也支持嵌套
#include taskflow/taskflow.hppint main() {tf::Taskflow taskflow;tf::Task A taskflow.emplace([] (tf::Subflow sbf){std::cout A spawns A1 subflow A2\n;tf::Task A1 sbf.emplace([] () {std::cout subtask A1\n;}).name(A1);tf::Task A2 sbf.emplace([] (tf::Subflow sbf2){std::cout A2 spawns A2_1 A2_2\n;tf::Task A2_1 sbf2.emplace([] () {std::cout subtask A2_1\n;}).name(A2_1);tf::Task A2_2 sbf2.emplace([] () {std::cout subtask A2_2\n;}).name(A2_2);A2_1.precede(A2_2);}).name(A2);A1.precede(A2);}).name(A);// execute the graph to spawn the subflowtf::Executor().run(taskflow).get();taskflow.dump(std::cout);
}同样也可以detach 子图的子图独立执行最终都会被master Taskflow 统一Join类似进程与子进程的关系