Skip to content

Commit

Permalink
[perf] change threadpool, local->steal->pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Sep 15, 2024
1 parent a793d04 commit 3a8f517
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/UtilsCtrl/ThreadPool/Task/UTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class UTask : public CStruct {

CVoid operator()() {
// impl_ 理论上不可能为空
impl_ ? impl_->call() : throw CException("UTask inner function is nullptr");
impl_->call();
}

UTask() = default;
Expand All @@ -67,7 +67,7 @@ class UTask : public CStruct {

private:
std::unique_ptr<TaskBased> impl_ = nullptr;
int priority_ = 0; // 任务的优先级信息
CInt priority_ = 0; // 任务的优先级信息
};


Expand Down
22 changes: 9 additions & 13 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,8 @@ class UThreadBase : public UThreadObject {
* 循环处理任务
* @return
*/
CStatus loopProcess() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_NOT_NULL(config_)

CVoid loopProcess() {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(config_)
if (config_->batch_task_enable_) {
while (done_) {
processTasks(); // 批量任务获取执行接口
Expand All @@ -148,8 +146,6 @@ class UThreadBase : public UThreadObject {
processTask(); // 单个任务获取执行接口
}
}

CGRAPH_FUNCTION_END
}


Expand Down Expand Up @@ -206,7 +202,7 @@ class UThreadBase : public UThreadObject {
* @param policy
* @return
*/
static int calcPolicy(int policy) {
static CInt calcPolicy(int policy) {
return (CGRAPH_THREAD_SCHED_OTHER == policy
|| CGRAPH_THREAD_SCHED_RR == policy
|| CGRAPH_THREAD_SCHED_FIFO == policy)
Expand All @@ -220,19 +216,19 @@ class UThreadBase : public UThreadObject {
* @param priority
* @return
*/
static int calcPriority(int priority) {
static CInt calcPriority(int priority) {
return (priority >= CGRAPH_THREAD_MIN_PRIORITY
&& priority <= CGRAPH_THREAD_MAX_PRIORITY)
? priority : CGRAPH_THREAD_MIN_PRIORITY;
}


protected:
bool done_; // 线程状态标记
bool is_init_; // 标记初始化状态
bool is_running_; // 是否正在执行
int type_ = 0; // 用于区分线程类型(主线程、辅助线程)
unsigned long total_task_num_ = 0; // 处理的任务的数字
CBool done_; // 线程状态标记
CBool is_init_; // 标记初始化状态
CBool is_running_; // 是否正在执行
CInt type_ = 0; // 用于区分线程类型(主线程、辅助线程)
CULong total_task_num_ = 0; // 处理的任务的数字

UAtomicQueue<UTask>* pool_task_queue_; // 用于存放线程池中的普通任务
UAtomicPriorityQueue<UTask>* pool_priority_task_queue_; // 用于存放线程池中的包含优先级任务的队列,仅辅助线程可以执行
Expand Down
12 changes: 6 additions & 6 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ class UThreadPrimary : public UThreadBase {
CGRAPH_RETURN_ERROR_STATUS("primary thread is null")
}

status = loopProcess();
loopProcess();
CGRAPH_FUNCTION_END
}


CVoid processTask() override {
UTask task;
if (popTask(task) || popPoolTask(task) || stealTask(task)) {
if (popTask(task) || stealTask(task) || popPoolTask(task)) {
runTask(task);
} else {
fatWait();
Expand All @@ -101,7 +101,7 @@ class UThreadPrimary : public UThreadBase {

CVoid processTasks() override {
UTaskArr tasks;
if (popTask(tasks) || popPoolTask(tasks) || stealTask(tasks)) {
if (popTask(tasks) || stealTask(tasks) || popPoolTask(tasks)) {
// 尝试从主线程中获取/盗取批量task,如果成功,则依次执行
runTasks(tasks);
} else {
Expand Down Expand Up @@ -288,12 +288,12 @@ class UThreadPrimary : public UThreadBase {
}

private:
int index_; // 线程index
int cur_empty_epoch_ = 0; // 当前空转的轮数信息
CInt index_; // 线程index
CInt cur_empty_epoch_ = 0; // 当前空转的轮数信息
UWorkStealingQueue<UTask> primary_queue_; // 内部队列信息
UWorkStealingQueue<UTask> secondary_queue_; // 第二个队列,用于减少触锁概率,提升性能
std::vector<UThreadPrimary *>* pool_threads_; // 用于存放线程池中的线程信息
std::vector<int> steal_targets_; // 被偷的目标信息
std::vector<CInt> steal_targets_; // 被偷的目标信息

friend class UThreadPool;
friend class UAllocator;
Expand Down
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class UThreadSecondary : public UThreadBase {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)

status = loopProcess();
loopProcess();
CGRAPH_FUNCTION_END
}

Expand Down

0 comments on commit 3a8f517

Please sign in to comment.