Skip to content

Commit

Permalink
[delete] delete setAutoCheck() function, check dag when pipeline init.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Sep 15, 2024
1 parent 1a4ae30 commit a793d04
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 88 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,19 @@ int main() {
* [GraphANNS](https://github.com/whenever5225/GraphANNS) : Graph-based Approximate Nearest Neighbor Search Working off CGraph
* [CThreadPool](https://github.com/ChunelFeng/CThreadPool) : 一个简单好用、功能强大、性能优异、跨平台的C++线程池
* [CGraph-lite](https://github.com/ChunelFeng/CGraph-lite) : head-only, simplest CGraph, with DAG executor and param translate function
* [taskflow](https://github.com/taskflow/taskflow) : A General-purpose Parallel and Heterogeneous Task Programming System
* [【B站视频】CGraph 和 taskflow 性能对比实测](https://www.bilibili.com/video/BV1gwWAekEAy/?spm_id_from=333.337.search-card.all.click&vd_source=2c7baed805c6cb33d630d5d4546cf0be) <br>
* [awesome-cpp](https://github.com/fffaraz/awesome-cpp) : A curated list of awesome C++ (or C) frameworks, libraries, resources, and shiny things. Inspired by awesome-... stuff.
* [awesome-workflow-engines](https://github.com/meirwah/awesome-workflow-engines) : A curated list of awesome open source workflow engines
* [taskflow](https://github.com/taskflow/taskflow) : A General-purpose Parallel and Heterogeneous Task Programming System
* [【B站视频】CGraph 和 taskflow 性能对比实测](https://www.bilibili.com/video/BV1gwWAekEAy/?spm_id_from=333.337.search-card.all.click&vd_source=2c7baed805c6cb33d630d5d4546cf0be) <br>
* [torchpipe](https://github.com/torchpipe/torchpipe) : Serving Inside Pytorch
* [【B站视频】开源项目torchpipe - ai引擎在线高并发经验和实战](https://www.bilibili.com/video/BV1Zm411X7k1/)
* [nndeploy](https://github.com/DeployAI/nndeploy) : nndeploy是一款模型端到端部署框架。以多端推理以及基于有向无环图模型部署为内核,致力为用户提供跨平台、简单易用、高性能的模型部署体验。
* [【B站视频】nndeploy--AI模型端到端部署框架(1)](https://www.bilibili.com/video/BV1VA4m1A7Bk)
* [【B站视频】nndeploy--AI模型端到端部署框架(2)](https://www.bilibili.com/video/BV1PK421v775)
* [KuiperInfer](https://github.com/zjhellofss/KuiperInfer) : 带你从零实现一个高性能的深度学习推理库,支持大模型 llama2 、Unet、Yolov5、Resnet等模型的推理。Implement a high-performance deep learning inference library step by step
* [【B站视频】KuiperInfer推理框架 - 一个面向教学的推理框架](https://www.bilibili.com/video/BV1t2421K7HN)
* [OGraph](https://github.com/symphony09/ograph) : A simple way to build a pipeline with Go.
* [【B站视频】听阿里云大佬分享:OGraph——基于Go的流图调度二三事](https://www.bilibili.com/video/BV19RHce6Evo)

[![Star History Chart](https://api.star-history.com/svg?repos=ChunelFeng/CGraph&type=Date)](https://star-history.com/#ChunelFeng/CGraph&Date)

Expand Down Expand Up @@ -327,8 +333,8 @@ int main() {
* 提供bazel编译方式
* 优化perf功能

[2024.09.07 - v2.6.1 - Chunel]
* 提供`pipeline`的静态执行的方式,提供微任务机制
[2024.09.15 - v2.6.1 - Chunel]
* 提供`pipeline`的静态执行的方式,提供基于静态执行的微任务机制
* 优化`event`(事件)机制,异步事件可以等待结束
* 提供`pipeline`剪裁功能,用于删除`element`之间重复的依赖
* 提供`element`删除依赖的方法
Expand Down
6 changes: 0 additions & 6 deletions src/GraphCtrl/GraphElement/GElementManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ CStatus GElementManager::run() {
CGRAPH_FUNCTION_BEGIN

status = engine_->run(); // 通过引擎来执行全部的逻辑
CGRAPH_FUNCTION_CHECK_STATUS

if (auto_check_enable_) {
// 默认是需要check一下执行结果的。如果为了增加一点效率,也可以通过外部设置不检查
status = engine_->afterRunCheck();
}
CGRAPH_FUNCTION_END
}

Expand Down
1 change: 0 additions & 1 deletion src/GraphCtrl/GraphElement/GElementManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class GElementManager : public GElementObject,
GEnginePtr engine_ { nullptr }; // 执行引擎
GEngineType engine_type_ { GEngineType::DYNAMIC }; // 引擎执行方式
UThreadPoolPtr thread_pool_ { nullptr }; // 线程池
CBool auto_check_enable_ = true; // 是否自动实现后校验逻辑

friend class GPipeline;
friend class GRegion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ CGRAPH_NAMESPACE_BEGIN
CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN
/**
* 1. 标记数据,比如有多少个结束element等
* 2. 标记哪些数据,是linkable 的
* 3. 分析当前dag类型信息
* 1. 判断是否是 dag 结构
* 2. 标记数据,比如有多少个结束element等
* 3. 标记哪些数据,是linkable 的
* 4. 分析当前dag类型信息
*/
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!GEngine::isDag(elements),
"it is not a dag struct");
mark(elements);
link(elements);
analysisDagType(elements);
Expand All @@ -43,24 +46,6 @@ CStatus GDynamicEngine::run() {
}


CStatus GDynamicEngine::afterRunCheck() {
CGRAPH_FUNCTION_BEGIN
/**
* 纯串行和纯并行 是不需要做结果校验的
* 但是普通的dag,后期还是校验一下为好
* 这里也可以通过外部接口来关闭
*/
if (internal::GEngineDagType::COMMON == dag_type_) {
for (GElementCPtr element : total_element_arr_) {
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!element->done_, \
element->getName() + ": dynamic engine, check not run it...")
}
}

CGRAPH_FUNCTION_END
}


CVoid GDynamicEngine::commonRunAll() {
/**
* 1. 执行没有任何依赖的element
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class GDynamicEngine : public GEngine {

CStatus run() override;

CStatus afterRunCheck() override;

/**
* 记录当前 elements 数据信息
* @param elements
Expand Down
54 changes: 46 additions & 8 deletions src/GraphCtrl/GraphElement/_GEngine/GEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#ifndef CGRAPH_GENGINE_H
#define CGRAPH_GENGINE_H

#include <vector>
#include <queue>

#include "GEngineDefine.h"
#include "../GElementObject.h"
#include "../GElementSorter.h"
Expand All @@ -26,14 +29,6 @@ class GEngine : public GElementObject {
*/
virtual CStatus setup(const GSortedGElementPtrSet& elements) = 0;

/**
* 执行完毕后,确认运行是否正常
* @return
*/
virtual CStatus afterRunCheck() {
CGRAPH_EMPTY_FUNCTION
}

/**
* 分析所有的可以设置 linkable 的数据
* @param elements
Expand All @@ -58,6 +53,49 @@ class GEngine : public GElementObject {
}
}

/**
* 计算当前elements的 拓扑排序信息
* @param elements
* @return
*/
static GElementPtrArr getTopo(const GSortedGElementPtrSet& elements) {
GElementPtrArr result;
std::queue<GElementPtr> readyQueue;
for (auto* element : elements) {
element->left_depend_ = element->dependence_.size();
if (0 == element->left_depend_) {
readyQueue.push(element);
}
}

while(!readyQueue.empty()) {
auto* cur = readyQueue.front();
readyQueue.pop();
result.push_back(cur);

for (auto* element : cur->run_before_) {
if (0 == --element->left_depend_) {
readyQueue.push(element);
}
}
}

for (auto element : elements) {
// 计算技术之后,需要恢复一下 depend的信息,以免引入误差
element->left_depend_ = element->dependence_.size();
}
return result;
}

/**
* 判断是否是dag的逻辑
* @param elements
* @return
*/
static CBool isDag(const GSortedGElementPtrSet& elements) {
const auto& result = getTopo(elements);
return result.size() == elements.size();
}

protected:
UThreadPoolPtr thread_pool_ { nullptr }; // 内部执行的线程池
Expand Down
24 changes: 1 addition & 23 deletions src/GraphCtrl/GraphElement/_GEngine/GTopoEngine/GTopoEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,14 @@
@Desc:
***************************/

#include <queue>

#include "GTopoEngine.h"

CGRAPH_NAMESPACE_BEGIN

CStatus GTopoEngine::setup(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN

topo_elements_.clear();
std::queue<GElementPtr> readyQueue;
for (auto* element : elements) {
element->left_depend_ = element->dependence_.size();
if (0 == element->left_depend_) {
readyQueue.push(element);
}
}

while(!readyQueue.empty()) {
auto* cur = readyQueue.front();
readyQueue.pop();
topo_elements_.push_back(cur);

for (auto* element : cur->run_before_) {
if (0 == --element->left_depend_) {
readyQueue.push(element);
}
}
}

topo_elements_ = GEngine::getTopo(elements);
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(topo_elements_.size() != elements.size(), \
"topo engine parse size is not right")

Expand Down
10 changes: 0 additions & 10 deletions src/GraphCtrl/GraphPipeline/GPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,16 +251,6 @@ GPipelinePtr GPipeline::setSharedThreadPool(UThreadPoolPtr ptr) {
}


GPipelinePtr GPipeline::setAutoCheck(CBool enable) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(element_manager_)

element_manager_->auto_check_enable_ = enable;
return this;
}


CSize GPipeline::getMaxPara() {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(element_manager_)
Expand Down
8 changes: 0 additions & 8 deletions src/GraphCtrl/GraphPipeline/GPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,6 @@ class GPipeline : public GPipelineObject,
*/
GPipeline* setSharedThreadPool(UThreadPoolPtr ptr);

/**
* 设置在执行完成后,是否校验整体执行逻辑
* @param enable
* @return
* @notice 默认校验。如果确定流程正常,可以考虑取消校验流程,从而降低整体耗时
*/
GPipeline* setAutoCheck(CBool enable);

/**
* 获取最大并发度
* @return
Expand Down
5 changes: 4 additions & 1 deletion test/Functional/test-functional-05.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ void test_functional_05() {
{
UTimeCounter counter("test_functional_05");
for (int i = 0; i < 10; i++) {
pipeline->run();
status = pipeline->run();
if (!status.isOK()) {
CGRAPH_ECHO("%s", status.getInfo().c_str());
}
}
}

Expand Down
1 change: 0 additions & 1 deletion test/Performance/test-performance-01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ void test_performance_01() {
for (auto& i : arr) {
pipeline->registerGElement<TestAdd1GNode>(&i);
}
pipeline->setAutoCheck(false);
status += pipeline->init();

{
Expand Down
1 change: 0 additions & 1 deletion test/Performance/test-performance-02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ void test_performance_02() {
pipeline->registerGElement<TestAdd1GNode>(&arr[i], {arr[i - 1]});
}
pipeline->makeSerial();
pipeline->setAutoCheck(false);
status += pipeline->init();
{
UTimeCounter counter("test_performance_02");
Expand Down
1 change: 0 additions & 1 deletion test/Performance/test-performance-03.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ void test_performance_03() {
config.primary_thread_busy_epoch_ = 500;
config.monitor_enable_ = false; // 关闭扩缩容机制
pipeline->setUniqueThreadPoolConfig(config);
pipeline->setAutoCheck(false);
pipeline->registerGElement<TestAdd1GNode>(&a);
pipeline->registerGElement<TestAdd1GNode>(&b1, {a});
pipeline->registerGElement<TestAdd1GNode>(&b2, {b1});
Expand Down
1 change: 0 additions & 1 deletion test/Performance/test-performance-04.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ void test_performance_04() {
config.max_task_steal_range_ = nodePerLayer - 1;
config.primary_thread_busy_epoch_ = 500;
pipeline->setUniqueThreadPoolConfig(config);
pipeline->setAutoCheck(false);

// 实现一个全连接
GElementPtrSet curLayer {};
Expand Down

0 comments on commit a793d04

Please sign in to comment.