-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_pool.cpp
106 lines (85 loc) · 2.77 KB
/
thread_pool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include "thread_pool.h"
#include <cstdio>
void CTask::setData(void* data) {
m_ptrData = data;
}
//静态成员初始化
vector<CTask*> CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
//线程管理类构造函数
CThreadPool::CThreadPool(int threadNum) {
this->m_iThreadNum = threadNum;
printf("I will create %d threads.\n", threadNum);
Create();
}
//线程回调函数
void* CThreadPool::ThreadFunc(void* threadData) {
pthread_t tid = pthread_self();
while(1)
{
pthread_mutex_lock(&m_pthreadMutex);
//如果队列为空,等待新任务进入任务队列
while(m_vecTaskList.size() == 0 && !shutdown)
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
//关闭线程
if(shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
printf("[tid: %lu]\texit\n", pthread_self());
pthread_exit(NULL);
}
printf("[tid: %lu]\trun: ", tid);
vector<CTask*>::iterator iter = m_vecTaskList.begin();
//取出一个任务并处理之
CTask* task = *iter;
if(iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}
pthread_mutex_unlock(&m_pthreadMutex);
task->Run(); //执行任务
printf("[tid: %lu]\tidle\n", tid);
}
return (void*)0;
}
//往任务队列里添加任务并发出线程同步信号
int CThreadPool::AddTask(CTask *task) {
pthread_mutex_lock(&m_pthreadMutex);
m_vecTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
//创建线程
int CThreadPool::Create() {
pthread_id = new pthread_t[m_iThreadNum];
for(int i = 0; i < m_iThreadNum; i++)
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
return 0;
}
//停止所有线程
int CThreadPool::StopAll() {
//避免重复调用
if(shutdown)
return -1;
printf("Now I will end all threads!\n\n");
//唤醒所有等待进程,线程池也要销毁了
shutdown = true;
pthread_cond_broadcast(&m_pthreadCond);
//清楚僵尸
for(int i = 0; i < m_iThreadNum; i++)
pthread_join(pthread_id[i], NULL);
delete[] pthread_id;
pthread_id = NULL;
//销毁互斥量和条件变量
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
//获取当前队列中的任务数
int CThreadPool::getTaskSize() {
return m_vecTaskList.size();
}