From ab2353583e664f2535648d4c5ac8c886f3ee8790 Mon Sep 17 00:00:00 2001 From: ChunelFeng Date: Sat, 24 Jun 2023 16:29:05 +0800 Subject: [PATCH] [feat] add yield and resume mothed for pipeline. --- CMakeLists.txt | 1 + README.md | 1 + .../GAdapter/GFunction/GFunction.h | 2 +- .../GAdapter/GSingleton/GSingleton.h | 2 +- src/GraphCtrl/GraphElement/GElement.cpp | 15 ++- src/GraphCtrl/GraphElement/GElement.h | 94 ++++++++++--------- src/GraphCtrl/GraphElement/GElementDefine.h | 8 ++ .../GraphElement/GElementManager.cpp | 4 + .../GraphElement/GGroup/GCluster/GCluster.h | 30 +++--- .../GGroup/GCondition/GCondition.h | 2 +- .../GGroup/GCondition/GMultiCondition.h | 5 +- .../GraphElement/GGroup/GRegion/GRegion.h | 1 + src/GraphCtrl/GraphPipeline/GPipeline.cpp | 44 ++++++--- src/GraphCtrl/GraphPipeline/GPipeline.h | 19 ++++ tutorial/MyGNode/MyNode2.h | 4 +- tutorial/T20-YieldResume.cpp | 54 +++++++++++ 16 files changed, 203 insertions(+), 83 deletions(-) create mode 100644 tutorial/T20-YieldResume.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 95c809fa..6c7bcb02 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ set(CGRAPH_TUTORIAL_LIST T17-MessagePubSub T18-Event T19-Cancel + T20-YieldResume # 以下为工具类tutorial TU01-ThreadPool diff --git a/README.md b/README.md index 7a502e2b..24d766af 100644 --- a/README.md +++ b/README.md @@ -312,6 +312,7 @@ int main() { [2023.06.17 - v2.4.2 - Chunel] * 提供`MultiCondition`(多条件)功能 +* 提供了`pipeline`暂停执行和恢复执行功能 diff --git a/src/GraphCtrl/GraphElement/GAdapter/GFunction/GFunction.h b/src/GraphCtrl/GraphElement/GAdapter/GFunction/GFunction.h index f1fb02c3..892f53a6 100644 --- a/src/GraphCtrl/GraphElement/GAdapter/GFunction/GFunction.h +++ b/src/GraphCtrl/GraphElement/GAdapter/GFunction/GFunction.h @@ -37,7 +37,7 @@ class GFunction : public GAdapter { // 针对GFunction,是需要写成public的,否则在外部的 lambda中,无法获取 CGRAPH_DECLARE_GPARAM_MANAGER_WRAPPER -protected: +private: explicit GFunction() { this->element_type_ = GElementType::FUNCTION; session_ = URandom<>::generateSession(CGRAPH_STR_FUNCTION); diff --git a/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.h b/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.h index 1db9491e..d41ec13b 100644 --- a/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.h +++ b/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.h @@ -15,7 +15,7 @@ CGRAPH_NAMESPACE_BEGIN template class GSingleton : public GAdapter { -protected: +private: explicit GSingleton() { this->element_type_ = GElementType::SINGLETON; session_ = URandom<>::generateSession(CGRAPH_STR_SINGLETON); diff --git a/src/GraphCtrl/GraphElement/GElement.cpp b/src/GraphCtrl/GraphElement/GElement.cpp index d3e507b3..1b9a06c4 100644 --- a/src/GraphCtrl/GraphElement/GElement.cpp +++ b/src/GraphCtrl/GraphElement/GElement.cpp @@ -158,19 +158,20 @@ CStatus GElement::fatProcessor(const CFunctionType& type) { try { switch (type) { case CFunctionType::RUN: { - for (CSize i = 0; i < this->loop_ && !cancel_.load(); i++) { + for (CSize i = 0; i < this->loop_ && GElementState::CANCEL != cur_state_.load(); i++) { /** 执行带切面的run方法 */ status = doAspect(GAspectType::BEGIN_RUN); CGRAPH_FUNCTION_CHECK_STATUS do { status = run(); /** - * 如果状态是ok的,并且被条件hold住,则循环执行 + * 在run结束之后,首先需要判断一下是否进入yield状态了。 + * 接下来,如果状态是ok的,并且被条件hold住,则循环执行 * 默认所有element的isHold条件均为false,即不hold,即执行一次 * 可以根据需求,对任意element类型,添加特定的isHold条件 * 并且没有被退出 * */ - } while (status.isOK() && this->isHold()); + } while (checkYield(), status.isOK() && this->isHold()); doAspect(GAspectType::FINISH_RUN, status); } break; @@ -279,6 +280,14 @@ CVoid GElement::dumpElement(std::ostream& oss) { } +CVoid GElement::checkYield() { + std::unique_lock lk(yield_mutex_); + this->yield_cv_.wait(lk, [this] { + return GElementState::YIELD != cur_state_; + }); +} + + CBool GElement::isGroup() { // 按位与 GROUP有值,表示是 GROUP的逻辑 return (long(element_type_) & long(GElementType::GROUP)) > 0; diff --git a/src/GraphCtrl/GraphElement/GElement.h b/src/GraphCtrl/GraphElement/GElement.h index a03325f7..9e0be834 100644 --- a/src/GraphCtrl/GraphElement/GElement.h +++ b/src/GraphCtrl/GraphElement/GElement.h @@ -120,18 +120,6 @@ class GElement : public GElementObject, */ ~GElement() override; - /** - * run方法执行之前的执行函数 - * @return - */ - virtual CStatus beforeRun(); - - /** - * run方法执行之后的执行函数 - * @return - */ - virtual CStatus afterRun(); - /** * 是否持续进行 * 默认为false,表示执行且仅执行一次 @@ -154,6 +142,52 @@ class GElement : public GElementObject, */ virtual CStatus crashed(const CException& ex); + /** + * 获取当前element内部参数 + * @tparam T + * @param key + * @return + */ + template::value, int> = 0> + T* getEParam(const std::string& key); + + /** + * 获取执行线程对应的信息 + * @return + * @notice 辅助线程返回-1 + */ + CIndex getThreadIndex(); + + /** + * 获取绑定线程id信息 + * @return + * @notice 不同的group类型,获取 binding index 的方式不同 + */ + virtual CIndex getBindingIndex(); + + /** + * 获取当前节点的相关关系信息,包含前驱、后继、从属关系 + * @param relation + * @return + */ + CStatus buildRelation(GElementRelation& relation); + + CGRAPH_NO_ALLOWED_COPY(GElement); + +private: + /** + * run方法执行之前的执行函数 + * @return + */ + virtual CStatus beforeRun(); + + /** + * run方法执行之后的执行函数 + * @return + */ + virtual CStatus afterRun(); + /** * 判定element是否可以运行 * 可执行的条件为:自身未被执行且依赖节点全部被执行 @@ -191,16 +225,6 @@ class GElement : public GElementObject, GParamManagerPtr paramManager, GEventManagerPtr eventManager); - /** - * 获取当前element内部参数 - * @tparam T - * @param key - * @return - */ - template::value, int> = 0> - T* getEParam(const std::string& key); - /** * 包含切面相关功能的函数,fat取自fatjar的意思 * @param type @@ -208,13 +232,6 @@ class GElement : public GElementObject, */ CStatus fatProcessor(const CFunctionType& type); - /** - * 获取执行线程对应的信息 - * @return - * @notice 辅助线程返回-1 - */ - CIndex getThreadIndex(); - /** * 设置线程池信息 * @param ptr @@ -246,26 +263,15 @@ class GElement : public GElementObject, CVoid dumpElement(std::ostream& oss); /** - * 获取绑定线程id信息 - * @return - * @notice 不同的group类型,获取 binding index 的方式不同 - */ - virtual CIndex getBindingIndex(); - - /** - * 获取当前节点的相关关系信息,包含前驱、后继、从属关系 - * @param relation + * 判断是否进入 yield状态。如果是的话,则等待恢复。未进入yield状态,则继续运行 * @return */ - CStatus buildRelation(GElementRelation& relation); - - CGRAPH_NO_ALLOWED_COPY(GElement); + inline CVoid checkYield(); private: CBool done_ { false }; // 判定被执行结束 CBool linkable_ { false }; // 判定是否可以连通计算 CBool visible_ { true }; // 判断可见的,如果被删除的话,则认为是不可见的 - std::atomic cancel_ { false }; // 是否被外部强制停止(多用于异步执行状态) CSize loop_ { CGRAPH_DEFAULT_LOOP_TIMES }; // 元素执行次数 CLevel level_ { CGRAPH_DEFAULT_ELEMENT_LEVEL }; // 用于设定init的执行顺序(值小的,优先init,可以为负数) CIndex binding_index_ { CGRAPH_DEFAULT_BINDING_INDEX }; // 用于设定绑定线程id @@ -277,6 +283,10 @@ class GElement : public GElementObject, GElementParamMap local_params_; // 用于记录当前element的内部参数 GAspectManagerPtr aspect_manager_ { nullptr }; // 整体流程的切面管理类 UThreadPoolPtr thread_pool_ { nullptr }; // 用于执行的线程池信息 + std::atomic cur_state_ { GElementState::CREATE }; // 当前执行状态 + + std::mutex yield_mutex_; // 控制停止执行的锁 + std::condition_variable yield_cv_; // 控制停止执行的条件变量 friend class GNode; friend class GCluster; diff --git a/src/GraphCtrl/GraphElement/GElementDefine.h b/src/GraphCtrl/GraphElement/GElementDefine.h index 7db04917..254e79ff 100644 --- a/src/GraphCtrl/GraphElement/GElementDefine.h +++ b/src/GraphCtrl/GraphElement/GElementDefine.h @@ -31,6 +31,14 @@ enum class GElementType { SINGLETON = 0x00040002, // 单例 }; + +enum class GElementState { + CREATE = 0x0000, // 创建状态(暂未init的情况) + NORMAL = 0x1000, // 正常执行状态 + CANCEL = 0x1001, // 取消状态 + YIELD = 0x1002, // 暂停状态 +}; + CGRAPH_NAMESPACE_END #endif //CGRAPH_GELEMENTDEFINE_H diff --git a/src/GraphCtrl/GraphElement/GElementManager.cpp b/src/GraphCtrl/GraphElement/GElementManager.cpp index 0dbf99b6..23eef0b1 100644 --- a/src/GraphCtrl/GraphElement/GElementManager.cpp +++ b/src/GraphCtrl/GraphElement/GElementManager.cpp @@ -39,6 +39,8 @@ CStatus GElementManager::init() { status = element->fatProcessor(CFunctionType::INIT); CGRAPH_FUNCTION_CHECK_STATUS element->is_init_ = true; + // 初始化了之后,就进入正常状态,可以执行了 + element->cur_state_ = GElementState::NORMAL; } CGRAPH_FUNCTION_END @@ -52,6 +54,8 @@ CStatus GElementManager::destroy() { status = element->fatProcessor(CFunctionType::DESTROY); CGRAPH_FUNCTION_CHECK_STATUS element->is_init_ = false; + // 如果destroy成功,则恢复到刚刚创建的状态 + element->cur_state_ = GElementState::CREATE; } CGRAPH_DELETE_PTR(engine_) diff --git a/src/GraphCtrl/GraphElement/GGroup/GCluster/GCluster.h b/src/GraphCtrl/GraphElement/GGroup/GCluster/GCluster.h index 748eee60..061fe064 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GCluster/GCluster.h +++ b/src/GraphCtrl/GraphElement/GGroup/GCluster/GCluster.h @@ -24,6 +24,19 @@ class GCluster : public GGroup { GCluster& operator=(const GCluster& cluster); protected: + /** + * 获取element个数信息 + * @return + */ + CSize getElementNum(); + + /** + * 获取绑定信息 + * @return + */ + CIndex getBindingIndex() override; + +private: /** * 线程池中的运行函数,依次执行beforeRun,run和afterRun方法, * 其中有任何返回值问题,则直接返回 @@ -32,35 +45,22 @@ class GCluster : public GGroup { */ CStatus process(CBool isMock); - CStatus addElement(GElementPtr element) final; - CStatus run() final; + CStatus addElement(GElementPtr element) final; + CStatus beforeRun() final; CStatus afterRun() final; CVoid dump(std::ostream& oss) final; - /** - * 获取element个数信息 - * @return - */ - CSize getElementNum(); - /** * 判断是否所有element均执行结束了 * @return */ CBool isDone(); - /** - * 获取绑定信息 - * @return - */ - CIndex getBindingIndex() override; - - friend class GElementManager; friend class GRegion; friend class GPipeline; diff --git a/src/GraphCtrl/GraphElement/GGroup/GCondition/GCondition.h b/src/GraphCtrl/GraphElement/GGroup/GCondition/GCondition.h index 5b158dbd..c74ce904 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GCondition/GCondition.h +++ b/src/GraphCtrl/GraphElement/GGroup/GCondition/GCondition.h @@ -31,9 +31,9 @@ class GCondition : public GGroup { */ CSize getRange() const; +private: CVoid dump(std::ostream& oss) final; -private: CStatus run() override; CStatus addElement(GElementPtr element) override; diff --git a/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.h b/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.h index 0a8ebc24..e343542c 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.h +++ b/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.h @@ -16,10 +16,10 @@ CGRAPH_NAMESPACE_BEGIN template class GMultiCondition : public GCondition { -public: +private: explicit GMultiCondition(); - CStatus run() override; + CStatus run() final; /** * 串行执行 @@ -33,7 +33,6 @@ class GMultiCondition : public GCondition { */ CStatus parallelRun(); -private: CIndex choose() final; friend class GPipeline; diff --git a/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.h b/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.h index ef97a58b..8b3749ec 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.h +++ b/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.h @@ -32,6 +32,7 @@ class GRegion : public GGroup { CStatus run() final; CStatus destroy() final; +private: CStatus addElement(GElementPtr element) final; CVoid dump(std::ostream& oss) final; diff --git a/src/GraphCtrl/GraphPipeline/GPipeline.cpp b/src/GraphCtrl/GraphPipeline/GPipeline.cpp index 402e6cf1..89030ca6 100644 --- a/src/GraphCtrl/GraphPipeline/GPipeline.cpp +++ b/src/GraphCtrl/GraphPipeline/GPipeline.cpp @@ -141,13 +141,18 @@ std::future GPipeline::asyncProcess(CSize runTimes) { CStatus GPipeline::cancel() { - CGRAPH_FUNCTION_BEGIN - // 将所有的信息,设置为cancel的状态,停止执行 - for (auto cur : element_repository_) { - cur->cancel_.store(true); - } + return pushAllState(GElementState::CANCEL); +} - CGRAPH_FUNCTION_END + +CStatus GPipeline::yield() { + return pushAllState(GElementState::YIELD); +} + + +CStatus GPipeline::resume() { + // 直接恢复正常状态好了 + return pushAllState(GElementState::NORMAL); } @@ -200,7 +205,7 @@ GPipelinePtr GPipeline::setUniqueThreadPoolConfig(const UThreadPoolConfig& confi } -GPipeline* GPipeline::setSharedThreadPool(UThreadPoolPtr ptr) { +GPipelinePtr GPipeline::setSharedThreadPool(UThreadPoolPtr ptr) { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT_RETURN_NULL(false) @@ -242,18 +247,27 @@ CStatus GPipeline::initSchedule() { CVoid GPipeline::prepare() { if (element_repository_.empty() - || !(*element_repository_.begin())->cancel_.load()) { - /** - * 有一个cancel 状态是 false,则表示全部为 false。进而不需要处理了 - * 普遍情况,应该是直接返回的。 - * 只有当上一次执行,被外部强制cancel的情况下,才会进入下方循环中的赋值逻辑 - */ + || GElementState::NORMAL != (*element_repository_.begin())->cur_state_.load()) { return; } - for (auto* cur : element_repository_) { - cur->cancel_.store(false); + pushAllState(GElementState::NORMAL); +} + + +CStatus GPipeline::pushAllState(GElementState state) { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(true) + + for (auto cur : element_repository_) { + cur->cur_state_.store(state); + if (GElementState::YIELD != state) { + // 目前仅非yield状态,需要切换的。如果一直处于 yield状态,是不需要被通知的 + cur->yield_cv_.notify_one(); + } } + + CGRAPH_FUNCTION_END } CGRAPH_NAMESPACE_END diff --git a/src/GraphCtrl/GraphPipeline/GPipeline.h b/src/GraphCtrl/GraphPipeline/GPipeline.h index 52d2b697..23ab014e 100644 --- a/src/GraphCtrl/GraphPipeline/GPipeline.h +++ b/src/GraphCtrl/GraphPipeline/GPipeline.h @@ -70,6 +70,18 @@ class GPipeline : public GPipelineObject, */ CStatus cancel(); + /** + * 暂停当前pipeline的执行 + * @return + */ + CStatus yield(); + + /** + * 恢复当前pipeline的执行 + * @return + */ + CStatus resume(); + /** * 生成图可视化 graphviz 信息 * @param oss @@ -268,6 +280,13 @@ class GPipeline : public GPipelineObject, */ CVoid prepare(); + /** + * 设置所有内部的element状态 + * @param state + * @return + */ + CStatus pushAllState(GElementState state); + /** 不允许外部赋值和构造 */ CGRAPH_NO_ALLOWED_COPY(GPipeline) diff --git a/tutorial/MyGNode/MyNode2.h b/tutorial/MyGNode/MyNode2.h index b9c00af7..6b6dabfc 100644 --- a/tutorial/MyGNode/MyNode2.h +++ b/tutorial/MyGNode/MyNode2.h @@ -17,7 +17,7 @@ class MyNode2 : public CGraph::GNode { // run 方法可以多次执行,且必须实现 CStatus init() override { CStatus status; - CGraph::CGRAPH_ECHO("[%s], enter MyNode2 init function.", this->getName().c_str()); + CGraph::CGRAPH_ECHO("[INIT] [%s], enter MyNode2 init function.", this->getName().c_str()); return status; } @@ -30,7 +30,7 @@ class MyNode2 : public CGraph::GNode { CStatus destroy() override { CStatus status; - CGraph::CGRAPH_ECHO("[%s], enter MyNode2 destroy function.", this->getName().c_str()); + CGraph::CGRAPH_ECHO("[DESTROY] [%s], enter MyNode2 destroy function.", this->getName().c_str()); return status; } }; diff --git a/tutorial/T20-YieldResume.cpp b/tutorial/T20-YieldResume.cpp new file mode 100644 index 00000000..d565b34a --- /dev/null +++ b/tutorial/T20-YieldResume.cpp @@ -0,0 +1,54 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: T20-YieldResume.cpp +@Time: 2023/6/24 15:08 +@Desc: 本例子主要展示异步执行的逻辑中,如何暂停和恢复执行pipeline的功能 +***************************/ + +#include "MyGNode/MyNode1.h" +#include "MyGNode/MyNode2.h" + +using namespace CGraph; + +void tutorial_yield_resume() { + GPipelinePtr pipeline = GPipelineFactory::create(); + GElementPtr a, b, c, d = nullptr; + + CStatus status = pipeline->registerGElement(&a, {}, "nodeA"); + status += pipeline->registerGElement(&b, {a}, "nodeB", 3); + status += pipeline->registerGElement(&c, {a}, "nodeC", 3); + status += pipeline->registerGElement(&d, {b, c}, "nodeD"); + if (!status.isOK()) { + std::cout << status.getInfo() << std::endl; + return; // 使用时,请对所有CGraph接口的返回值做判定。今后tutorial例子中省略该操作。 + } + + status += pipeline->init(); + + CGRAPH_ECHO("pipeline async run, BEGIN."); + auto result = pipeline->asyncRun(); + CGRAPH_SLEEP_MILLISECOND(2600) + + CGRAPH_ECHO("pipeline async run, YIELD after 2600ms."); + status += pipeline->yield(); // 暂停执行,保留当前pipeline内部所有参数信息和状态信息 + + CGRAPH_SLEEP_MILLISECOND(7200) + CGRAPH_ECHO("pipeline async run, RESUME after 7200ms."); + status += pipeline->resume(); // 暂停一段时间后,恢复执行 + + result.wait(); // 等待pipeline异步执行结束 + status += pipeline->destroy(); + if (!status.isOK()) { + std::cout << status.getInfo() << std::endl; + return; + } + + GPipelineFactory::remove(pipeline); +} + + +int main () { + tutorial_yield_resume(); + return 0; +}