From a793d0496d48b5e20b993e1b2a74bdee2f71558b Mon Sep 17 00:00:00 2001 From: ChunelFeng Date: Sun, 15 Sep 2024 15:12:57 +0800 Subject: [PATCH] [delete] delete setAutoCheck() function, check dag when pipeline init. --- README.md | 14 +++-- .../GraphElement/GElementManager.cpp | 6 --- src/GraphCtrl/GraphElement/GElementManager.h | 1 - .../GDynamicEngine/GDynamicEngine.cpp | 27 +++------- .../_GEngine/GDynamicEngine/GDynamicEngine.h | 2 - src/GraphCtrl/GraphElement/_GEngine/GEngine.h | 54 ++++++++++++++++--- .../_GEngine/GTopoEngine/GTopoEngine.cpp | 24 +-------- src/GraphCtrl/GraphPipeline/GPipeline.cpp | 10 ---- src/GraphCtrl/GraphPipeline/GPipeline.h | 8 --- test/Functional/test-functional-05.cpp | 5 +- test/Performance/test-performance-01.cpp | 1 - test/Performance/test-performance-02.cpp | 1 - test/Performance/test-performance-03.cpp | 1 - test/Performance/test-performance-04.cpp | 1 - 14 files changed, 67 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index 3aa5de7c..e08ff1bf 100644 --- a/README.md +++ b/README.md @@ -134,13 +134,19 @@ int main() { * [GraphANNS](https://github.com/whenever5225/GraphANNS) : Graph-based Approximate Nearest Neighbor Search Working off CGraph * [CThreadPool](https://github.com/ChunelFeng/CThreadPool) : 一个简单好用、功能强大、性能优异、跨平台的C++线程池 * [CGraph-lite](https://github.com/ChunelFeng/CGraph-lite) : head-only, simplest CGraph, with DAG executor and param translate function -* [taskflow](https://github.com/taskflow/taskflow) : A General-purpose Parallel and Heterogeneous Task Programming System - * [【B站视频】CGraph 和 taskflow 性能对比实测](https://www.bilibili.com/video/BV1gwWAekEAy/?spm_id_from=333.337.search-card.all.click&vd_source=2c7baed805c6cb33d630d5d4546cf0be)
* [awesome-cpp](https://github.com/fffaraz/awesome-cpp) : A curated list of awesome C++ (or C) frameworks, libraries, resources, and shiny things. Inspired by awesome-... stuff. * [awesome-workflow-engines](https://github.com/meirwah/awesome-workflow-engines) : A curated list of awesome open source workflow engines +* [taskflow](https://github.com/taskflow/taskflow) : A General-purpose Parallel and Heterogeneous Task Programming System + * [【B站视频】CGraph 和 taskflow 性能对比实测](https://www.bilibili.com/video/BV1gwWAekEAy/?spm_id_from=333.337.search-card.all.click&vd_source=2c7baed805c6cb33d630d5d4546cf0be)
+* [torchpipe](https://github.com/torchpipe/torchpipe) : Serving Inside Pytorch + * [【B站视频】开源项目torchpipe - ai引擎在线高并发经验和实战](https://www.bilibili.com/video/BV1Zm411X7k1/) * [nndeploy](https://github.com/DeployAI/nndeploy) : nndeploy是一款模型端到端部署框架。以多端推理以及基于有向无环图模型部署为内核,致力为用户提供跨平台、简单易用、高性能的模型部署体验。 + * [【B站视频】nndeploy--AI模型端到端部署框架(1)](https://www.bilibili.com/video/BV1VA4m1A7Bk) + * [【B站视频】nndeploy--AI模型端到端部署框架(2)](https://www.bilibili.com/video/BV1PK421v775) * [KuiperInfer](https://github.com/zjhellofss/KuiperInfer) : 带你从零实现一个高性能的深度学习推理库,支持大模型 llama2 、Unet、Yolov5、Resnet等模型的推理。Implement a high-performance deep learning inference library step by step + * [【B站视频】KuiperInfer推理框架 - 一个面向教学的推理框架](https://www.bilibili.com/video/BV1t2421K7HN) * [OGraph](https://github.com/symphony09/ograph) : A simple way to build a pipeline with Go. + * [【B站视频】听阿里云大佬分享:OGraph——基于Go的流图调度二三事](https://www.bilibili.com/video/BV19RHce6Evo) [![Star History Chart](https://api.star-history.com/svg?repos=ChunelFeng/CGraph&type=Date)](https://star-history.com/#ChunelFeng/CGraph&Date) @@ -327,8 +333,8 @@ int main() { * 提供bazel编译方式 * 优化perf功能 -[2024.09.07 - v2.6.1 - Chunel] -* 提供`pipeline`的静态执行的方式,提供微任务机制 +[2024.09.15 - v2.6.1 - Chunel] +* 提供`pipeline`的静态执行的方式,提供基于静态执行的微任务机制 * 优化`event`(事件)机制,异步事件可以等待结束 * 提供`pipeline`剪裁功能,用于删除`element`之间重复的依赖 * 提供`element`删除依赖的方法 diff --git a/src/GraphCtrl/GraphElement/GElementManager.cpp b/src/GraphCtrl/GraphElement/GElementManager.cpp index 2ccfe52f..8f923223 100644 --- a/src/GraphCtrl/GraphElement/GElementManager.cpp +++ b/src/GraphCtrl/GraphElement/GElementManager.cpp @@ -59,12 +59,6 @@ CStatus GElementManager::run() { CGRAPH_FUNCTION_BEGIN status = engine_->run(); // 通过引擎来执行全部的逻辑 - CGRAPH_FUNCTION_CHECK_STATUS - - if (auto_check_enable_) { - // 默认是需要check一下执行结果的。如果为了增加一点效率,也可以通过外部设置不检查 - status = engine_->afterRunCheck(); - } CGRAPH_FUNCTION_END } diff --git a/src/GraphCtrl/GraphElement/GElementManager.h b/src/GraphCtrl/GraphElement/GElementManager.h index bb800c5a..08d1e33f 100644 --- a/src/GraphCtrl/GraphElement/GElementManager.h +++ b/src/GraphCtrl/GraphElement/GElementManager.h @@ -103,7 +103,6 @@ class GElementManager : public GElementObject, GEnginePtr engine_ { nullptr }; // 执行引擎 GEngineType engine_type_ { GEngineType::DYNAMIC }; // 引擎执行方式 UThreadPoolPtr thread_pool_ { nullptr }; // 线程池 - CBool auto_check_enable_ = true; // 是否自动实现后校验逻辑 friend class GPipeline; friend class GRegion; diff --git a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp index f83f23ee..26bdf4a9 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp +++ b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp @@ -13,10 +13,13 @@ CGRAPH_NAMESPACE_BEGIN CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) { CGRAPH_FUNCTION_BEGIN /** - * 1. 标记数据,比如有多少个结束element等 - * 2. 标记哪些数据,是linkable 的 - * 3. 分析当前dag类型信息 + * 1. 判断是否是 dag 结构 + * 2. 标记数据,比如有多少个结束element等 + * 3. 标记哪些数据,是linkable 的 + * 4. 分析当前dag类型信息 */ + CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!GEngine::isDag(elements), + "it is not a dag struct"); mark(elements); link(elements); analysisDagType(elements); @@ -43,24 +46,6 @@ CStatus GDynamicEngine::run() { } -CStatus GDynamicEngine::afterRunCheck() { - CGRAPH_FUNCTION_BEGIN - /** - * 纯串行和纯并行 是不需要做结果校验的 - * 但是普通的dag,后期还是校验一下为好 - * 这里也可以通过外部接口来关闭 - */ - if (internal::GEngineDagType::COMMON == dag_type_) { - for (GElementCPtr element : total_element_arr_) { - CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!element->done_, \ - element->getName() + ": dynamic engine, check not run it...") - } - } - - CGRAPH_FUNCTION_END -} - - CVoid GDynamicEngine::commonRunAll() { /** * 1. 执行没有任何依赖的element diff --git a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h index e98d2829..5b78411d 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h +++ b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h @@ -23,8 +23,6 @@ class GDynamicEngine : public GEngine { CStatus run() override; - CStatus afterRunCheck() override; - /** * 记录当前 elements 数据信息 * @param elements diff --git a/src/GraphCtrl/GraphElement/_GEngine/GEngine.h b/src/GraphCtrl/GraphElement/_GEngine/GEngine.h index 88934727..f5f3c5b1 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GEngine.h +++ b/src/GraphCtrl/GraphElement/_GEngine/GEngine.h @@ -9,6 +9,9 @@ #ifndef CGRAPH_GENGINE_H #define CGRAPH_GENGINE_H +#include +#include + #include "GEngineDefine.h" #include "../GElementObject.h" #include "../GElementSorter.h" @@ -26,14 +29,6 @@ class GEngine : public GElementObject { */ virtual CStatus setup(const GSortedGElementPtrSet& elements) = 0; - /** - * 执行完毕后,确认运行是否正常 - * @return - */ - virtual CStatus afterRunCheck() { - CGRAPH_EMPTY_FUNCTION - } - /** * 分析所有的可以设置 linkable 的数据 * @param elements @@ -58,6 +53,49 @@ class GEngine : public GElementObject { } } + /** + * 计算当前elements的 拓扑排序信息 + * @param elements + * @return + */ + static GElementPtrArr getTopo(const GSortedGElementPtrSet& elements) { + GElementPtrArr result; + std::queue readyQueue; + for (auto* element : elements) { + element->left_depend_ = element->dependence_.size(); + if (0 == element->left_depend_) { + readyQueue.push(element); + } + } + + while(!readyQueue.empty()) { + auto* cur = readyQueue.front(); + readyQueue.pop(); + result.push_back(cur); + + for (auto* element : cur->run_before_) { + if (0 == --element->left_depend_) { + readyQueue.push(element); + } + } + } + + for (auto element : elements) { + // 计算技术之后,需要恢复一下 depend的信息,以免引入误差 + element->left_depend_ = element->dependence_.size(); + } + return result; + } + + /** + * 判断是否是dag的逻辑 + * @param elements + * @return + */ + static CBool isDag(const GSortedGElementPtrSet& elements) { + const auto& result = getTopo(elements); + return result.size() == elements.size(); + } protected: UThreadPoolPtr thread_pool_ { nullptr }; // 内部执行的线程池 diff --git a/src/GraphCtrl/GraphElement/_GEngine/GTopoEngine/GTopoEngine.cpp b/src/GraphCtrl/GraphElement/_GEngine/GTopoEngine/GTopoEngine.cpp index 4b00e743..35ad44c6 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GTopoEngine/GTopoEngine.cpp +++ b/src/GraphCtrl/GraphElement/_GEngine/GTopoEngine/GTopoEngine.cpp @@ -6,8 +6,6 @@ @Desc: ***************************/ -#include - #include "GTopoEngine.h" CGRAPH_NAMESPACE_BEGIN @@ -15,27 +13,7 @@ CGRAPH_NAMESPACE_BEGIN CStatus GTopoEngine::setup(const GSortedGElementPtrSet& elements) { CGRAPH_FUNCTION_BEGIN - topo_elements_.clear(); - std::queue readyQueue; - for (auto* element : elements) { - element->left_depend_ = element->dependence_.size(); - if (0 == element->left_depend_) { - readyQueue.push(element); - } - } - - while(!readyQueue.empty()) { - auto* cur = readyQueue.front(); - readyQueue.pop(); - topo_elements_.push_back(cur); - - for (auto* element : cur->run_before_) { - if (0 == --element->left_depend_) { - readyQueue.push(element); - } - } - } - + topo_elements_ = GEngine::getTopo(elements); CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(topo_elements_.size() != elements.size(), \ "topo engine parse size is not right") diff --git a/src/GraphCtrl/GraphPipeline/GPipeline.cpp b/src/GraphCtrl/GraphPipeline/GPipeline.cpp index cf60f722..ef0a74fc 100644 --- a/src/GraphCtrl/GraphPipeline/GPipeline.cpp +++ b/src/GraphCtrl/GraphPipeline/GPipeline.cpp @@ -251,16 +251,6 @@ GPipelinePtr GPipeline::setSharedThreadPool(UThreadPoolPtr ptr) { } -GPipelinePtr GPipeline::setAutoCheck(CBool enable) { - CGRAPH_FUNCTION_BEGIN - CGRAPH_ASSERT_INIT_THROW_ERROR(false) - CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(element_manager_) - - element_manager_->auto_check_enable_ = enable; - return this; -} - - CSize GPipeline::getMaxPara() { CGRAPH_ASSERT_INIT_THROW_ERROR(false) CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(element_manager_) diff --git a/src/GraphCtrl/GraphPipeline/GPipeline.h b/src/GraphCtrl/GraphPipeline/GPipeline.h index 01af6c40..e4bbe003 100644 --- a/src/GraphCtrl/GraphPipeline/GPipeline.h +++ b/src/GraphCtrl/GraphPipeline/GPipeline.h @@ -339,14 +339,6 @@ class GPipeline : public GPipelineObject, */ GPipeline* setSharedThreadPool(UThreadPoolPtr ptr); - /** - * 设置在执行完成后,是否校验整体执行逻辑 - * @param enable - * @return - * @notice 默认校验。如果确定流程正常,可以考虑取消校验流程,从而降低整体耗时 - */ - GPipeline* setAutoCheck(CBool enable); - /** * 获取最大并发度 * @return diff --git a/test/Functional/test-functional-05.cpp b/test/Functional/test-functional-05.cpp index 957ed0bb..685faa84 100644 --- a/test/Functional/test-functional-05.cpp +++ b/test/Functional/test-functional-05.cpp @@ -24,7 +24,10 @@ void test_functional_05() { { UTimeCounter counter("test_functional_05"); for (int i = 0; i < 10; i++) { - pipeline->run(); + status = pipeline->run(); + if (!status.isOK()) { + CGRAPH_ECHO("%s", status.getInfo().c_str()); + } } } diff --git a/test/Performance/test-performance-01.cpp b/test/Performance/test-performance-01.cpp index dbd705f9..bcf599bb 100644 --- a/test/Performance/test-performance-01.cpp +++ b/test/Performance/test-performance-01.cpp @@ -29,7 +29,6 @@ void test_performance_01() { for (auto& i : arr) { pipeline->registerGElement(&i); } - pipeline->setAutoCheck(false); status += pipeline->init(); { diff --git a/test/Performance/test-performance-02.cpp b/test/Performance/test-performance-02.cpp index bcabd463..ef8370df 100644 --- a/test/Performance/test-performance-02.cpp +++ b/test/Performance/test-performance-02.cpp @@ -23,7 +23,6 @@ void test_performance_02() { pipeline->registerGElement(&arr[i], {arr[i - 1]}); } pipeline->makeSerial(); - pipeline->setAutoCheck(false); status += pipeline->init(); { UTimeCounter counter("test_performance_02"); diff --git a/test/Performance/test-performance-03.cpp b/test/Performance/test-performance-03.cpp index dfc91bea..39908df0 100644 --- a/test/Performance/test-performance-03.cpp +++ b/test/Performance/test-performance-03.cpp @@ -26,7 +26,6 @@ void test_performance_03() { config.primary_thread_busy_epoch_ = 500; config.monitor_enable_ = false; // 关闭扩缩容机制 pipeline->setUniqueThreadPoolConfig(config); - pipeline->setAutoCheck(false); pipeline->registerGElement(&a); pipeline->registerGElement(&b1, {a}); pipeline->registerGElement(&b2, {b1}); diff --git a/test/Performance/test-performance-04.cpp b/test/Performance/test-performance-04.cpp index 12e0e170..1f08846a 100644 --- a/test/Performance/test-performance-04.cpp +++ b/test/Performance/test-performance-04.cpp @@ -23,7 +23,6 @@ void test_performance_04() { config.max_task_steal_range_ = nodePerLayer - 1; config.primary_thread_busy_epoch_ = 500; pipeline->setUniqueThreadPoolConfig(config); - pipeline->setAutoCheck(false); // 实现一个全连接 GElementPtrSet curLayer {};