Skip to content

Commit

Permalink
[feat] add yield and resume mothed for pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Jun 24, 2023
1 parent 90d1054 commit ab23535
Show file tree
Hide file tree
Showing 16 changed files with 203 additions and 83 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ set(CGRAPH_TUTORIAL_LIST
T17-MessagePubSub
T18-Event
T19-Cancel
T20-YieldResume

# 以下为工具类tutorial
TU01-ThreadPool
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ int main() {
[2023.06.17 - v2.4.2 - Chunel]
* 提供`MultiCondition`(多条件)功能
* 提供了`pipeline`暂停执行和恢复执行功能
</details>
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GAdapter/GFunction/GFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class GFunction : public GAdapter {
// 针对GFunction,是需要写成public的,否则在外部的 lambda中,无法获取
CGRAPH_DECLARE_GPARAM_MANAGER_WRAPPER

protected:
private:
explicit GFunction() {
this->element_type_ = GElementType::FUNCTION;
session_ = URandom<>::generateSession(CGRAPH_STR_FUNCTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CGRAPH_NAMESPACE_BEGIN

template<typename T>
class GSingleton : public GAdapter {
protected:
private:
explicit GSingleton() {
this->element_type_ = GElementType::SINGLETON;
session_ = URandom<>::generateSession(CGRAPH_STR_SINGLETON);
Expand Down
15 changes: 12 additions & 3 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,20 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
try {
switch (type) {
case CFunctionType::RUN: {
for (CSize i = 0; i < this->loop_ && !cancel_.load(); i++) {
for (CSize i = 0; i < this->loop_ && GElementState::CANCEL != cur_state_.load(); i++) {
/** 执行带切面的run方法 */
status = doAspect(GAspectType::BEGIN_RUN);
CGRAPH_FUNCTION_CHECK_STATUS
do {
status = run();
/**
* 如果状态是ok的,并且被条件hold住,则循环执行
* 在run结束之后,首先需要判断一下是否进入yield状态了。
* 接下来,如果状态是ok的,并且被条件hold住,则循环执行
* 默认所有element的isHold条件均为false,即不hold,即执行一次
* 可以根据需求,对任意element类型,添加特定的isHold条件
* 并且没有被退出
* */
} while (status.isOK() && this->isHold());
} while (checkYield(), status.isOK() && this->isHold());
doAspect(GAspectType::FINISH_RUN, status);
}
break;
Expand Down Expand Up @@ -279,6 +280,14 @@ CVoid GElement::dumpElement(std::ostream& oss) {
}


CVoid GElement::checkYield() {
std::unique_lock<std::mutex> lk(yield_mutex_);
this->yield_cv_.wait(lk, [this] {
return GElementState::YIELD != cur_state_;
});
}


CBool GElement::isGroup() {
// 按位与 GROUP有值,表示是 GROUP的逻辑
return (long(element_type_) & long(GElementType::GROUP)) > 0;
Expand Down
94 changes: 52 additions & 42 deletions src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,6 @@ class GElement : public GElementObject,
*/
~GElement() override;

/**
* run方法执行之前的执行函数
* @return
*/
virtual CStatus beforeRun();

/**
* run方法执行之后的执行函数
* @return
*/
virtual CStatus afterRun();

/**
* 是否持续进行
* 默认为false,表示执行且仅执行一次
Expand All @@ -154,6 +142,52 @@ class GElement : public GElementObject,
*/
virtual CStatus crashed(const CException& ex);

/**
* 获取当前element内部参数
* @tparam T
* @param key
* @return
*/
template<typename T,
c_enable_if_t<std::is_base_of<GElementParam, T>::value, int> = 0>
T* getEParam(const std::string& key);

/**
* 获取执行线程对应的信息
* @return
* @notice 辅助线程返回-1
*/
CIndex getThreadIndex();

/**
* 获取绑定线程id信息
* @return
* @notice 不同的group类型,获取 binding index 的方式不同
*/
virtual CIndex getBindingIndex();

/**
* 获取当前节点的相关关系信息,包含前驱、后继、从属关系
* @param relation
* @return
*/
CStatus buildRelation(GElementRelation& relation);

CGRAPH_NO_ALLOWED_COPY(GElement);

private:
/**
* run方法执行之前的执行函数
* @return
*/
virtual CStatus beforeRun();

/**
* run方法执行之后的执行函数
* @return
*/
virtual CStatus afterRun();

/**
* 判定element是否可以运行
* 可执行的条件为:自身未被执行且依赖节点全部被执行
Expand Down Expand Up @@ -191,30 +225,13 @@ class GElement : public GElementObject,
GParamManagerPtr paramManager,
GEventManagerPtr eventManager);

/**
* 获取当前element内部参数
* @tparam T
* @param key
* @return
*/
template<typename T,
c_enable_if_t<std::is_base_of<GElementParam, T>::value, int> = 0>
T* getEParam(const std::string& key);

/**
* 包含切面相关功能的函数,fat取自fatjar的意思
* @param type
* @return
*/
CStatus fatProcessor(const CFunctionType& type);

/**
* 获取执行线程对应的信息
* @return
* @notice 辅助线程返回-1
*/
CIndex getThreadIndex();

/**
* 设置线程池信息
* @param ptr
Expand Down Expand Up @@ -246,26 +263,15 @@ class GElement : public GElementObject,
CVoid dumpElement(std::ostream& oss);

/**
* 获取绑定线程id信息
* @return
* @notice 不同的group类型,获取 binding index 的方式不同
*/
virtual CIndex getBindingIndex();

/**
* 获取当前节点的相关关系信息,包含前驱、后继、从属关系
* @param relation
* 判断是否进入 yield状态。如果是的话,则等待恢复。未进入yield状态,则继续运行
* @return
*/
CStatus buildRelation(GElementRelation& relation);

CGRAPH_NO_ALLOWED_COPY(GElement);
inline CVoid checkYield();

private:
CBool done_ { false }; // 判定被执行结束
CBool linkable_ { false }; // 判定是否可以连通计算
CBool visible_ { true }; // 判断可见的,如果被删除的话,则认为是不可见的
std::atomic<CBool> cancel_ { false }; // 是否被外部强制停止(多用于异步执行状态)
CSize loop_ { CGRAPH_DEFAULT_LOOP_TIMES }; // 元素执行次数
CLevel level_ { CGRAPH_DEFAULT_ELEMENT_LEVEL }; // 用于设定init的执行顺序(值小的,优先init,可以为负数)
CIndex binding_index_ { CGRAPH_DEFAULT_BINDING_INDEX }; // 用于设定绑定线程id
Expand All @@ -277,6 +283,10 @@ class GElement : public GElementObject,
GElementParamMap local_params_; // 用于记录当前element的内部参数
GAspectManagerPtr aspect_manager_ { nullptr }; // 整体流程的切面管理类
UThreadPoolPtr thread_pool_ { nullptr }; // 用于执行的线程池信息
std::atomic<GElementState> cur_state_ { GElementState::CREATE }; // 当前执行状态

std::mutex yield_mutex_; // 控制停止执行的锁
std::condition_variable yield_cv_; // 控制停止执行的条件变量

friend class GNode;
friend class GCluster;
Expand Down
8 changes: 8 additions & 0 deletions src/GraphCtrl/GraphElement/GElementDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ enum class GElementType {
SINGLETON = 0x00040002, // 单例
};


enum class GElementState {
CREATE = 0x0000, // 创建状态(暂未init的情况)
NORMAL = 0x1000, // 正常执行状态
CANCEL = 0x1001, // 取消状态
YIELD = 0x1002, // 暂停状态
};

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GELEMENTDEFINE_H
4 changes: 4 additions & 0 deletions src/GraphCtrl/GraphElement/GElementManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ CStatus GElementManager::init() {
status = element->fatProcessor(CFunctionType::INIT);
CGRAPH_FUNCTION_CHECK_STATUS
element->is_init_ = true;
// 初始化了之后,就进入正常状态,可以执行了
element->cur_state_ = GElementState::NORMAL;
}

CGRAPH_FUNCTION_END
Expand All @@ -52,6 +54,8 @@ CStatus GElementManager::destroy() {
status = element->fatProcessor(CFunctionType::DESTROY);
CGRAPH_FUNCTION_CHECK_STATUS
element->is_init_ = false;
// 如果destroy成功,则恢复到刚刚创建的状态
element->cur_state_ = GElementState::CREATE;
}

CGRAPH_DELETE_PTR(engine_)
Expand Down
30 changes: 15 additions & 15 deletions src/GraphCtrl/GraphElement/GGroup/GCluster/GCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ class GCluster : public GGroup {
GCluster& operator=(const GCluster& cluster);

protected:
/**
* 获取element个数信息
* @return
*/
CSize getElementNum();

/**
* 获取绑定信息
* @return
*/
CIndex getBindingIndex() override;

private:
/**
* 线程池中的运行函数,依次执行beforeRun,run和afterRun方法,
* 其中有任何返回值问题,则直接返回
Expand All @@ -32,35 +45,22 @@ class GCluster : public GGroup {
*/
CStatus process(CBool isMock);

CStatus addElement(GElementPtr element) final;

CStatus run() final;

CStatus addElement(GElementPtr element) final;

CStatus beforeRun() final;

CStatus afterRun() final;

CVoid dump(std::ostream& oss) final;

/**
* 获取element个数信息
* @return
*/
CSize getElementNum();

/**
* 判断是否所有element均执行结束了
* @return
*/
CBool isDone();

/**
* 获取绑定信息
* @return
*/
CIndex getBindingIndex() override;


friend class GElementManager;
friend class GRegion;
friend class GPipeline;
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GGroup/GCondition/GCondition.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class GCondition : public GGroup {
*/
CSize getRange() const;

private:
CVoid dump(std::ostream& oss) final;

private:
CStatus run() override;

CStatus addElement(GElementPtr element) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ CGRAPH_NAMESPACE_BEGIN

template<GMultiConditionType type>
class GMultiCondition : public GCondition {
public:
private:
explicit GMultiCondition();

CStatus run() override;
CStatus run() final;

/**
* 串行执行
Expand All @@ -33,7 +33,6 @@ class GMultiCondition : public GCondition {
*/
CStatus parallelRun();

private:
CIndex choose() final;

friend class GPipeline;
Expand Down
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class GRegion : public GGroup {
CStatus run() final;
CStatus destroy() final;

private:
CStatus addElement(GElementPtr element) final;

CVoid dump(std::ostream& oss) final;
Expand Down
Loading

0 comments on commit ab23535

Please sign in to comment.