这是对pthread线程的一个简单应用
1. 实现了线程池的概念,线程可以重复使用。
2. 对信号量,互斥锁等进行封装,业务处理函数中只需写和业务相关的代码。
3. 移植性好。如果想把这个线程池代码应用到自己的实现中去,只要写自己的业务处理函数和改写工作队列数据的处理方法就可以了。
Sample代码主要包括一个主程序和两个线程实现类
ThreadTest.cpp:主程序
CThreadManager:线程管理Class,线程池的实现类
CThread:线程Class.
主程序实现方法。
1. 实现main函数和一个需要线程处理的业务函数(例子代码中业务函数是一个简单的计算函数Count)。在main函数中创建CThreadManager的实例,产生线程池。这个时候,把业务函数作为函数指针传到CThreadManager里面,最终会被线程调用。
2. 向工作队列中放入业务函数要处理的数据。
3. 设置信号量,唤醒线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// 线程要执行的函数 int Count( int nWork) { int nResult = nWork * nWork; printf ( "count result is %d\n" ,nResult); return 0; } int main() { // 创建线程管理类的实例,把要执行的线程函数和最大线程数传进去 CThreadManager* pManager = new CThreadManager(Count, 3); // 把要进行计算的数放到工作队列中 pManager->PushWorkQue(5); pManager->PushWorkQue(20); // 设置信号量,唤醒线程 pManager->PostSem(); pManager->PostSem(); // 等待子线程执行 sleep(1); return 0; } |
CThreadManager实现的方法
1. 把信号量和互斥锁等封装成自己的函数
2. 在new方法里,循环调用CThread的new方法,启动一定数量(可设定)的线程,产生线程池。
3. 这些线程启动后,就会执行CThreadManager中的ManageFuction函数。这个函数是无限循环的,保证了线程在整个程序的生命周期中不销毁。
4. 在循环处理里面,第一行代码就是等待一个信号量,这个信号量是由主程序进行设置的,这个信号信号量如果没有被设置(代表暂时没有需要处理的工作),所有线程都在这里阻塞着。
4. 一旦信号量被设置,根据Linux线程调度机制,在阻塞的线程队列中,其中一个线程被唤醒,可以执行后面的代码。
5. 从工作队列中取出要进行处理的数据(使用互斥锁进行排他)
6. 通过函数指针调用main函数传过来的业务函数,处理数据。
7. 业务函数执行完之后,线程进入下一个循环,等待新的信号量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
class CThreadManager { friend void * ManageFuction( void *); private : sem_t m_sem; // 信号量 pthread_mutex_t m_mutex; // 互斥锁 queue< int > m_queWork; // 工作队列 list<CThread*> m_lstThread; // 线程list int (*m_threadFuction)( int ); //函数指针,指向main函数传过来的线程执行函数 public : CThreadManager( int (*threadFuction)( int ), int nMaxThreadCnt); virtual ~CThreadManager(); int WaitSem(); int PostSem(); int LockMutex(); int UnlockMutex(); void PushWorkQue( int nWork); int PopWorkQue(); int RunThreadFunction( int nWork); }; // 线程执行函数,它只是个壳子,处理信号量和互斥锁等, // 最后调用main函数传过来的线程执行函数来实现业务处理 void * ManageFuction( void * argv) { CThreadManager* pManager = (CThreadManager*)argv; // 进行无限循环(意味着线程是不销毁的,重复利用) while ( true ) { // 线程开启后,就在这里阻塞着,直到main函数设置了信号量 pManager->WaitSem(); printf ( "thread wakeup.\n" ); // 从工作队列中取出要处理的数 pManager->LockMutex(); int nWork = pManager->PopWorkQue(); pManager->UnlockMutex(); printf ( "call Count function.\n" ); pManager->RunThreadFunction(nWork); } return 0; } // 构造方法 CThreadManager::CThreadManager( int (*threadFuction)( int ), int nMaxThreadCnt) { sem_init(&m_sem, 0, 0); pthread_mutex_init(&m_mutex, NULL); m_threadFuction = threadFuction; for ( int i=0; i<nMaxThreadCnt; i++) { CThread* pThread = new CThread(ManageFuction, this ); printf ( "thread started.\n" ); m_lstThread.push_back(pThread); } } |
CThread实现的方法
CThreadManager比较简单,封装了创建线程和join线程的函数。
1
2
3
4
5
6
7
8
|
CThread::CThread( void * (*threadFuction)( void *), void * threadArgv) { // 初始化线程属性 pthread_attr_t threadAttr; pthread_attr_init(&threadAttr); pthread_create(&m_thread, &threadAttr, threadFuction, threadArgv); } |
c++线程池,继承CDoit,实现其中的start和end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
/* * 多线程管理类 * */ #ifndef CTHREADPOOLMANAGE_H #define CTHREADPOOLMANAGE_H #include <iostream> #include <pthread.h> #include <unistd.h> #include <list> #include <vector> #include <time.h> #include <asm/errno.h> #define USLEEP_TIME 100 #define CHECK_TIME 1 using namespace std; class CDoit { public : virtual int start( void *){}; virtual int end(){}; }; class CthreadPoolManage { private : int _minThreads; //最少保留几个线程 int _maxThreads; //最多可以有几个线程 int _waitSec; //空闲多少秒后将线程关闭 class threadInfo{ public : threadInfo(){ isbusy = false ; doFlag = true ; } // pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond=PTHREAD_COND_INITIALIZER; bool isbusy; //是否空闲 bool doFlag; // time_t beginTime; //线程不工作开始时间 pthread_t cThreadPid; //线程id pthread_attr_t cThreadAttr; //线程属性 CDoit * doit; //任务类 void * value; //需要传递的值 }; //线程函数 static void * startThread( void *); //任务队列锁 pthread_mutex_t _duty_mutex; //任务队列 list<threadInfo*> _dutyList; //线程队列锁 pthread_mutex_t _thread_mutex; //线程队列 list<threadInfo*> _threadList; ///初始化,创建最小个数线程/// void initThread(); ///任务分配线程/// static void * taskAllocation( void *arg); pthread_t tasktPid; ///线程销毁、状态检查线程/// static void * checkThread( void * arg); pthread_t checktPid; bool checkrun; //线程异常退出清理 static void threadCleanUp( void * arg); // int addThread(list<threadInfo*> *plist,threadInfo* ptinfo); public : CthreadPoolManage(); /* 保留的最少线程,最多线程数,空闲多久销毁,保留几个线程的冗余 */ CthreadPoolManage( int min, int max, int waitSec); ~CthreadPoolManage(); int start(); //任务注入器 int putDuty(CDoit *, void *); int getNowThreadNum(); }; #endif // CTHREADPOOLMANAGE_H |
CPP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
|
/* * 线程池,线程管理类 * */ #include "cthreadpoolmanage.h" CthreadPoolManage::CthreadPoolManage() { _minThreads = 5; //最少保留几个线程 _maxThreads = 5; //最多可以有几个线程 _waitSec = 10; //空闲多少秒后将线程关闭 pthread_mutex_init(&_duty_mutex, NULL); pthread_mutex_init(&_thread_mutex, NULL); checkrun = true ; } CthreadPoolManage::CthreadPoolManage( int min, int max, int waitSec) { CthreadPoolManage(); _minThreads = min; //最少保留几个线程 _maxThreads = max; //最多可以有几个线程 _waitSec = waitSec; //空闲多少秒后将线程关闭 } CthreadPoolManage::~CthreadPoolManage() { } void CthreadPoolManage::threadCleanUp( void * arg) { threadInfo* tinfo = (threadInfo*)arg; tinfo->isbusy = false ; pthread_mutex_unlock(&tinfo->mtx); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo; } void * CthreadPoolManage::startThread( void * arg) { cout<< "线程开始工作" <<endl; threadInfo* tinfo = (threadInfo*)arg; pthread_cleanup_push(threadCleanUp,arg); while (tinfo->doFlag){ pthread_mutex_lock(&tinfo->mtx); if (tinfo->doit == NULL) { cout<< "开始等待任务" <<endl; pthread_cond_wait(&tinfo->cond,&tinfo->mtx); cout<< "有任务了" <<endl; } tinfo->isbusy = true ; tinfo->doit->start(tinfo->value); tinfo->doit->end(); tinfo->doit=NULL; tinfo->isbusy = false ; time ( &tinfo->beginTime); pthread_mutex_unlock(&tinfo->mtx); } //0正常执行到这儿不执行清理函数,异常会执行 pthread_cleanup_pop(0); pthread_attr_destroy (&tinfo->cThreadAttr); delete tinfo; cout<< "线程结束" <<endl; } void CthreadPoolManage::initThread() { int i = 0; for (i = 0;i< this ->_minThreads;i++) { threadInfo *tinfo = new threadInfo; tinfo->doit = NULL; tinfo->value = NULL; tinfo->isbusy = false ; tinfo->doFlag = true ; // PTHREAD_CREATE_DETACHED (分离线程) 和 PTHREAD _CREATE_JOINABLE (非分离线程) pthread_attr_init(&tinfo->cThreadAttr); pthread_attr_setdetachstate(&tinfo->cThreadAttr,PTHREAD_CREATE_DETACHED ); cout<< "初始化了一个线程" <<endl; if (pthread_create(&tinfo->cThreadPid,&tinfo->cThreadAttr,startThread,( void *)tinfo) != 0) { cout<< "创建线程失败" <<endl; break ; } this ->_threadList.push_back(tinfo); } } int CthreadPoolManage::addThread(std::list< CthreadPoolManage::threadInfo* >* plist, CthreadPoolManage::threadInfo* ptinfo) { threadInfo *tinfo = new threadInfo; tinfo->doit = ptinfo->doit; tinfo->value = ptinfo->value; tinfo->isbusy = true ; if (pthread_create(&tinfo->cThreadPid,NULL,startThread,( void *)tinfo) != 0) { cout<< "创建线程失败" <<endl; return -1; } plist->push_back(tinfo); return 0; } int CthreadPoolManage::putDuty(CDoit* doit, void * value) { threadInfo *tinfo = new threadInfo; time ( &tinfo->beginTime); tinfo->doit= doit; tinfo->value = value; pthread_mutex_lock(&_duty_mutex); this ->_dutyList.push_back(tinfo); pthread_mutex_unlock(&_duty_mutex); return 0; } void * CthreadPoolManage::taskAllocation( void *arg) { CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; int size_1 = 0; int size_2 = 0; int i_1 = 0; int i_2 = 0; bool a_1 = true ; bool a_2 = true ; threadInfo* ptinfo; threadInfo* ptinfoTmp; while ( true ){ size_1 = 0; size_2 = 0; pthread_mutex_lock(&ptmanage->_duty_mutex); pthread_mutex_lock(&ptmanage->_thread_mutex); size_1 = ptmanage->_dutyList.size(); size_2 =ptmanage->_threadList.size(); for (list<threadInfo*>::iterator itorti1 = ptmanage->_dutyList.begin();itorti1 !=ptmanage->_dutyList.end();) { ptinfo = *itorti1; a_1 = true ; for (list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();itorti2++){ ptinfoTmp = *itorti2; if (EBUSY == pthread_mutex_trylock(&ptinfoTmp->mtx)) { continue ; } if (!ptinfoTmp->isbusy) { ptinfoTmp->doit = ptinfo->doit; ptinfoTmp->value = ptinfo->value; ptinfoTmp->isbusy = true ; pthread_cond_signal(&ptinfoTmp->cond); pthread_mutex_unlock(&ptinfoTmp->mtx); a_1 = false ; delete ptinfo; break ; } pthread_mutex_unlock(&ptinfoTmp->mtx); } if (a_1){ if (ptmanage->_threadList.size()>ptmanage->_maxThreads||ptmanage->addThread(&ptmanage->_threadList,ptinfo)!=0) { itorti1++; continue ; } else { itorti1 = ptmanage->_dutyList.erase(itorti1); } delete ptinfo; } else { itorti1 = ptmanage->_dutyList.erase(itorti1); } } pthread_mutex_unlock(&ptmanage->_duty_mutex); pthread_mutex_unlock(&ptmanage->_thread_mutex); usleep(USLEEP_TIME); } return 0; } void * CthreadPoolManage::checkThread( void * arg) { CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg; threadInfo* ptinfo; time_t nowtime; while (ptmanage->checkrun){ sleep(CHECK_TIME); pthread_mutex_lock(&ptmanage->_thread_mutex); if (ptmanage->_threadList.size()<=ptmanage->_minThreads) { pthread_mutex_unlock(&ptmanage->_thread_mutex); continue ; } for (list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();){ ptinfo = *itorti2; if (EBUSY == pthread_mutex_trylock(&ptinfo->mtx)) { itorti2++; continue ; } time (&nowtime); if (ptinfo->isbusy == false && nowtime-ptinfo->beginTime>ptmanage->_waitSec) { ptinfo->doFlag = false ; itorti2 = ptmanage->_threadList.erase(itorti2); } else { itorti2++; } pthread_mutex_unlock(&ptinfo->mtx); } pthread_mutex_unlock(&ptmanage->_thread_mutex); } } int CthreadPoolManage::start() { //初始化 this ->initThread(); //启动任务分配线程 if (pthread_create(&tasktPid,NULL,taskAllocation,( void *) this ) != 0) { cout<< "创建任务分配线程失败" <<endl; return -1; } //创建现程状态分配管理线程 if (pthread_create(&checktPid,NULL,checkThread,( void *) this ) != 0) { cout<< "创建线程状态分配管理线程失败" <<endl; return -1; } return 0; } /////////////////////////////// int CthreadPoolManage::getNowThreadNum() { int num = 0; pthread_mutex_lock(& this ->_thread_mutex); num = this ->_threadList.size(); pthread_mutex_unlock(& this ->_thread_mutex); return num ; } |