1. 程式人生 > >C++線程池實現

C++線程池實現

include iterator pop cal gin ppa 不能訪問 stream protect

面試有被問到怎麽實現線程池,網上找的可以用的代碼,在VS2010上測試通過,沒有用到C++11,信號量也是用WINDOWS的。

線程池為了節省開辟線程耗費的資源,提前創建一批線程處於信號量等待狀態,需要用的時候將任務加入隊列中,發送信號量,搶占到的線程執行該任務。具體代碼如下:

  1 #ifndef _ThreadPool_H_
  2 #define _ThreadPool_H_
  3 #pragma warning(disable: 4530)
  4 #pragma warning(disable: 4786)
  5 #include <cassert>
  6 #include <vector>
  7
#include <queue> 8 #include <windows.h> 9 #include "stdafx.h" 10 #include <iostream> 11 using namespace std; 12 13 14 class ThreadJob //工作基類 15 { 16 public: 17 //供線程池調用的虛函數 18 virtual void DoJob(void *pPara) = 0; 19 }; 20 class ThreadPool 21 { 22 public: 23 //
dwNum 線程池規模 24 ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 25 { 26 InitializeCriticalSection(&_csThreadVector); 27 InitializeCriticalSection(&_csWorkQueue); 28 _EventComplete = CreateEvent(0, false, false, NULL); 29 _EventEnd = CreateEvent(0
, true, false, NULL); 30 _SemaphoreCall = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL); 31 _SemaphoreDel = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL); 32 assert(_SemaphoreCall != INVALID_HANDLE_VALUE); 33 assert(_EventComplete != INVALID_HANDLE_VALUE); 34 assert(_EventEnd != INVALID_HANDLE_VALUE); 35 assert(_SemaphoreDel != INVALID_HANDLE_VALUE); 36 AdjustSize(dwNum <= 0 ? 4 : dwNum); 37 } 38 ~ThreadPool() 39 { 40 DeleteCriticalSection(&_csWorkQueue); 41 CloseHandle(_EventEnd); 42 CloseHandle(_EventComplete); 43 CloseHandle(_SemaphoreCall); 44 CloseHandle(_SemaphoreDel); 45 vector<ThreadItem*>::iterator iter; 46 for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++) 47 { 48 if(*iter) 49 delete *iter; 50 } 51 DeleteCriticalSection(&_csThreadVector); 52 } 53 // //調整線程池規模 54 int AdjustSize(int iNum) 55 { 56 if(iNum > 0) 57 { 58 ThreadItem *pNew; 59 EnterCriticalSection(&_csThreadVector); 60 for(int _i=0; _i<iNum; _i++) 61 { 62 _ThreadVector.push_back(pNew = new ThreadItem(this)); 63 assert(pNew); 64 pNew->_iTreadID=_i; 65 pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL); 66 // set priority 67 SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL); 68 assert(pNew->_Handle); 69 } 70 LeaveCriticalSection(&_csThreadVector); 71 } 72 else 73 { 74 iNum *= -1; 75 ReleaseSemaphore(_SemaphoreDel, iNum > _lThreadNum ? _lThreadNum : iNum, NULL); 76 } 77 return (int)_lThreadNum; 78 } 79 //調用線程池 80 void Call(void (*pFunc)(void *), void *pPara = NULL) 81 { 82 assert(pFunc); 83 EnterCriticalSection(&_csWorkQueue); 84 if(_ThreadVector.empty()) 85 { 86 printf("線程隊列空\n\r"); 87 } 88 else 89 { 90 printf("線程隊列不空\n\r"); 91 _JobQueue.push(new JobItem(pFunc, pPara)); 92 } 93 94 LeaveCriticalSection(&_csWorkQueue); 95 ReleaseSemaphore(_SemaphoreCall, 1, NULL); 96 } 97 //調用線程池 98 inline void Call(ThreadJob * p, void *pPara = NULL) 99 { 100 Call(CallProc, new CallProcPara(p, pPara)); 101 } 102 //結束線程池, 並同步等待 103 bool EndAndWait(DWORD dwWaitTime = INFINITE) 104 { 105 SetEvent(_EventEnd); 106 return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0; 107 } 108 //結束線程池 109 inline void End() 110 { 111 SetEvent(_EventEnd); 112 } 113 inline DWORD Size() 114 { 115 return (DWORD)_lThreadNum; 116 } 117 inline DWORD GetRunningSize() 118 { 119 return (DWORD)_lRunningNum; 120 } 121 bool IsRunning() 122 { 123 return _lRunningNum > 0; 124 } 125 protected: 126 // //工作線程 127 static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL) 128 { 129 ThreadItem *pNew; 130 pNew=(ThreadItem*)lpParameter; 131 printf("threadID=%d \n\r",pNew->_iTreadID); 132 ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter); 133 assert(pThread); 134 ThreadPool *pThreadPoolObj = pThread->_pThis; 135 assert(pThreadPoolObj); 136 InterlockedIncrement(&pThreadPoolObj->_lThreadNum);//InterLockedIncrement 能夠保證在一個線程訪問變量時其它線程不能訪問 137 HANDLE hWaitHandle[3]; 138 hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall; 139 hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel; 140 hWaitHandle[2] = pThreadPoolObj->_EventEnd; 141 JobItem *pJob; 142 bool fHasJob; 143 for(;;) 144 { 145 146 DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE); 147 printf("hh =%d\n\r",pNew->_iTreadID); 148 //響應刪除線程信號 149 if(wr == WAIT_OBJECT_0 + 1) 150 break; 151 //從隊列裏取得用戶作業 152 EnterCriticalSection(&pThreadPoolObj->_csWorkQueue); 153 if(fHasJob = !pThreadPoolObj->_JobQueue.empty()) 154 { 155 pJob = pThreadPoolObj->_JobQueue.front(); 156 pThreadPoolObj->_JobQueue.pop(); 157 assert(pJob); 158 } 159 LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue); 160 //受到結束線程信號 確定是否結束線程(結束線程信號 && 是否還有工作) 161 if(wr == WAIT_OBJECT_0 + 2 && !fHasJob) 162 { 163 printf("endthread =%d\n\r",pNew->_iTreadID); 164 break; 165 } 166 167 if(fHasJob && pJob) 168 { 169 InterlockedIncrement(&pThreadPoolObj->_lRunningNum); 170 pThread->_dwLastBeginTime = GetTickCount(); 171 pThread->_dwCount++; 172 pThread->_fIsRunning = true; 173 pJob->_pFunc(pJob->_pPara); //運行用戶作業 174 delete pJob; 175 pThread->_fIsRunning = false; 176 InterlockedDecrement(&pThreadPoolObj->_lRunningNum); 177 } 178 } 179 //刪除自身結構 180 EnterCriticalSection(&pThreadPoolObj->_csThreadVector); 181 pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread)); 182 LeaveCriticalSection(&pThreadPoolObj->_csThreadVector); 183 delete pThread; 184 InterlockedDecrement(&pThreadPoolObj->_lThreadNum); 185 if(!pThreadPoolObj->_lThreadNum) //所有線程結束 186 SetEvent(pThreadPoolObj->_EventComplete); 187 return 0; 188 } 189 //調用用戶對象虛函數 190 static void CallProc(void *pPara) 191 { 192 CallProcPara *cp = static_cast<CallProcPara *>(pPara); 193 assert(cp); 194 if(cp) 195 { 196 cp->_pObj->DoJob(cp->_pPara); 197 delete cp; 198 } 199 } 200 //用戶對象結構 201 struct CallProcPara 202 { 203 ThreadJob* _pObj;//用戶對象 204 void *_pPara;//用戶參數 205 CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { }; 206 }; 207 //用戶函數結構 208 struct JobItem 209 { 210 void (*_pFunc)(void *);//函數 211 void *_pPara; //參數 212 JobItem(void (*pFunc)(void *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { }; 213 }; 214 // //線程池中的線程結構 215 struct ThreadItem 216 { 217 int _iTreadID; 218 HANDLE _Handle; //線程句柄 219 ThreadPool *_pThis; //線程池的指針 220 DWORD _dwLastBeginTime; //最後一次運行開始時間 221 DWORD _dwCount; //運行次數 222 bool _fIsRunning; 223 ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { }; 224 ~ThreadItem() 225 { 226 if(_Handle) 227 { 228 CloseHandle(_Handle); 229 _Handle = NULL; 230 } 231 } 232 }; 233 std::queue<JobItem *> _JobQueue; //工作隊列 234 std::vector<ThreadItem *> _ThreadVector; //線程數據 235 CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作隊列臨界, 線程數據臨界 236 HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//結束通知, 完成事件, 工作信號, 刪除線程信號 237 long _lThreadNum, _lRunningNum; //線程數, 運行的線程數 238 }; 239 #endif //_ThreadPool_H_ 240 241 void threadfunc(void *p) 242 { 243 printf("*********\n\r"); 244 for (int i=0;i<10;i++) 245 { 246 for(int j=0;j<10;j++) 247 { 248 printf("%02d ",i-j); 249 } 250 printf("\n\r"); 251 } 252 printf("*********\n\r"); 253 //YourClass* yourObject = (YourClass*) p; 254 //... 255 } 256 257 void fun2(void *p) 258 { 259 260 printf("*********\n\r"); 261 for (int i=0;i<20;i++) 262 { 263 for(int j=0;j<20;j++) 264 { 265 printf("%02d ",i); 266 } 267 Sleep(30); 268 printf("\n\r"); 269 270 } 271 //YourClass* yourObject = (YourClass*) p; 272 //... 273 printf("*********\n\r"); 274 } 275 void main() 276 { 277 //ThreadPool tp; 278 //for(int i=0; i<100; i++) 279 // tp.Call(threadfunc); 280 ThreadPool tp(20);//20為初始線程池規模 281 Sleep(1000); 282 tp.Call(threadfunc, NULL); 283 tp.Call(fun2, NULL); 284 //tp.Call(fun2, NULL); 285 //tp.Call(fun2, NULL); 286 //tp.Call(fun2, NULL); 287 288 for(int i=0;i<20;i++) 289 { 290 tp.End(); 291 } 292 293 Sleep(2000); 294 printf("run\n\r"); 295 tp.Call(fun2, NULL); 296 Sleep(100000); 297 system("pause"); 298 return ; 299 }

C++線程池實現