C++多執行緒例項之臨界區同步
阿新 • • 發佈:2019-02-12
本篇是對上一篇
進行了重構,增加了windos下的臨界區鎖。臨界區的特點:非核心物件,只能在window下使用,linux下不能使用;只能在同一程序內的執行緒間使用,速度快。
互斥量特點:互斥量是核心物件,可以用於程序內也可以在程序間互斥,速度相對互斥量慢點,也可以解決某程序意外終止所造成的“遺棄”問題。臨界區和互斥量都是隻能互斥,不能同步,因為它們都有“執行緒所有權”,一旦擁有可以進行多次鎖定。可以通過核心物件(linux下是條件變數)設定無訊號阻塞當前執行緒,其它執行緒觸發訊號來啟動被阻塞的執行緒,來控制執行緒的進入,這樣可以做到同步。
MutexLock.h
#ifndef _PTHREADLOCK_H_ #define _PTHREADLOCK_H_ #ifdef WIN32 #define WINOS // 這裡可以註釋掉就是linux形式的互斥量 #else #undef WINOS #endif #ifdef WINOS #include <Windows.h> #include <process.h> #else #include <pthread.h> //來自ftp://sources.redhat.com/pub/pthreads-win32/ /* 獲取互斥量屬性物件在程序間共享與否的標誌 int pthread_mutexattr_getpshared (__const pthread_mutexattr_t *__restrict __attr, \ int *__restrict __pshared); 設定互斥量屬性物件,標識在程序間共享與否 int pthread_mutexattr_setpshared (pthread_mutexattr_t *__attr, int __pshared); */ /*參考文章:http://www.ibm.com/developerworks/cn/linux/l-cn-mthreadps/ http://blog.csdn.net/anonymalias/article/details/9093733 https://software.intel.com/zh-cn/blogs/2011/03/24/linux-windows */ #endif class CEvent { public: CEvent(); ~CEvent(); void init(); void release(); void waite();// 當獲取不到資料,那麼waite掛起執行緒,等待其它執行緒通知釋放 void unwaite();// 生產了資料那麼需要呼叫unwaite. private: #ifdef WINOS HANDLE m_event;//事件如果有訊號那麼可以正常執行,如果無訊號那麼只能等待 #else pthread_cond_t m_condition; #endif }; // 提供了等待機制和介面的鎖,子類只需要實現自己鎖的邏輯就可以了 class ILock { public: ILock(); virtual ~ILock(); virtual void init() = 0; virtual void release() = 0; virtual void lock() = 0; virtual void unlock() = 0; void waite(); void unwaite(); private: CEvent *m_pEvent;// 子類不用管鎖的事情 }; class CMutexLock: public ILock { public: CMutexLock(); ~CMutexLock(); void init(); void release(); void lock(); void unlock();// 設計的時候,不要unwaite放置到unlock裡面去,否則會導致職責不分明,如果有內部控制的還會導致無法喚醒。 private: #ifdef WINOS HANDLE m_mutex; #else pthread_mutex_t m_mutex; pthread_mutexattr_t m_mutexAttr; #endif }; class CCriticalLock: public ILock { public: CCriticalLock(); ~CCriticalLock(); void init(); void release(); void lock(); void unlock(); private: CRITICAL_SECTION m_critical; }; #endif
MutexLock.cpp
#include "stdafx.h" #include "MutexLock.h" /*參考文章: https://msdn.microsoft.com/en-us/library/windows/desktop/ms682411%28v=vs.85%29.aspx http://blog.csdn.net/anonymalias/article/details/9080881 http://blog.csdn.net/anonymalias/article/details/9174403 */ CEvent::CEvent() { #ifdef WINOS m_event = NULL;; #else m_condition = NULL; #endif } CEvent::~CEvent() { } void CEvent::init() { #ifdef WINOS //事件是有訊號執行緒不阻塞,無訊號阻塞執行緒睡眠。 // arg1是事件屬性。 // arg2是手動還是自動呼叫ResetEvent將事件設定為無訊號,SetEvent是將事件設定為有訊號 // ResetEvent是否手動設定為無訊號,WaitForSingleObject後如果是自動方式那麼會自動呼叫ResetEvent將事件設定為無訊號。 // arg3是初始狀態訊號,一般設定為FALSE無訊號,讓執行緒掛起阻塞。 // arg4是執行緒的名字。 m_event = CreateEvent(NULL, FALSE, FALSE, NULL); #else pthread_cond_init(&m_condition, NULL); #endif } void CEvent::release() { #ifdef WINOS CloseHandle(m_event); m_event = NULL; #else pthread_cond_destroy(&m_condition); m_condition = NULL; #endif } void CEvent::waite() { #ifdef WINOS WaitForSingleObject(m_event, INFINITE);// 等待的事件,和時間 #else //會自動呼叫pthread_mutex_unlock(&m_mutex)釋放互斥量,將當前執行緒掛起阻塞,等待對方執行緒pthread_cond_signal通知喚醒, // 喚醒後pthread_cond_wait會呼叫pthread_mutex_lock重新鎖定互斥量。 // pthread_cond_timedwait是阻塞一段時間。 pthread_cond_wait(&m_condition, &m_mutex); #endif } void CEvent::unwaite() { #ifdef WINOS SetEvent(m_event);//設定為有訊號,喚醒等待事件掛起的執行緒。 #else pthread_cond_signal(&m_condition);//pthread_cond_broadcast(pthread_cond_t * cond)喚醒在條件上等待的所有執行緒。 #endif } ILock::ILock() { m_pEvent = new CEvent(); m_pEvent->init(); } ILock::~ILock() { if(m_pEvent != NULL) { m_pEvent->release(); delete m_pEvent; m_pEvent = NULL; } } void ILock::waite() { m_pEvent->waite(); } void ILock::unwaite() { m_pEvent->unwaite(); } CMutexLock::CMutexLock() { m_mutex = NULL; #ifndef WINOS m_mutexAttr == NULL; #endif } CMutexLock::~CMutexLock() { release(); } void CMutexLock::release() { #ifdef WINOS if(m_mutex != NULL) { CloseHandle(m_mutex);//所有核心物件,或者用其它方式建立的,都可以用closeHandle將引用計數減1。 m_mutex = NULL; } #else if(m_mutexAttr != NULL) { pthread_mutexattr_destroy(&m_mutexAttr); m_mutexAttr = NULL; } if(m_mutex != NULL) { pthread_mutex_destroy(&m_mutex); m_mutex = NULL; } #endif } void CMutexLock::init() { #ifdef WINOS // arg1 是NULL,互斥量用預設的安全描述資訊,這個時候子程序不能繼承該互斥量. // arg2 是當前指明互斥量指向的執行緒為空,且被引用的次數是0,沒有執行緒/程序擁有該互斥量;否則當前執行緒擁有該互斥量。 // arg3 互斥量的名字 m_mutex = CreateMutex(NULL, FALSE, NULL); DWORD dwLastError = GetLastError(); if( dwLastError == ERROR_ALREADY_EXISTS) { CloseHandle(m_mutex); m_mutex = NULL; } #else // arg1是初始化的互斥量,arg2是pthread_mutexattr_t屬性指標,如果是NULL,那麼沒有執行緒擁有該初始化好的互斥量。 int nResult = pthread_mutex_init(&m_mutex, NULL); if(nResult == 0) { printf("pthread_mutex_init result OK.\n"); } else { printf("pthread_mutex_init result error:%d\n", nResult); } pthread_mutexattr_init(&m_mutexAttr); // 設定 recursive 屬性,使得linux下可以遞迴加鎖,避免遞迴加鎖死鎖。 pthread_mutexattr_settype(&m_mutexAttr,PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&m_mutex, &m_mutexAttr); #endif } void CMutexLock::lock() { #ifdef WINOS // arg2是等待毫秒時間,INFINITE是永遠等待,直到該核心物件被觸發可用;該函式是一個非同步呼叫函式,互斥量擁有執行緒id非0, // 那麼該函式將被掛起阻塞,釋放當前CPU擁有權,當被其它執行緒釋放互斥量擁有執行緒id為0,將會喚醒當前阻塞的執行緒重新獲取互斥量。 WaitForSingleObject(m_mutex, INFINITE); /*if(WaiteforSingleObject(m_hMutex, dwMilliSec) == WAIT_OBJECT_0) { return true; } return false; */ #else // 鎖定互斥鎖,如果該互斥鎖被其它執行緒擁有,那麼將被掛起阻塞,指定可用才回調返回; // 執行緒自己多次鎖定將會導致死鎖;兩個執行緒需要多個互斥鎖相互等待對方的互斥鎖,也會導致死鎖。 pthread_mutex_lock(&m_mutex); #endif } void CMutexLock::unlock() { #ifdef WINOS ReleaseMutex(m_mutex);// 將互斥量釋放,會通知到WaitForSingleObject. #else pthread_mutex_unlock(&m_mutex); #endif } CCriticalLock::CCriticalLock() { } CCriticalLock::~CCriticalLock() { } void CCriticalLock::init() { InitializeCriticalSection(&m_critical); } void CCriticalLock::release() { DeleteCriticalSection(&m_critical); } void CCriticalLock::lock() { EnterCriticalSection(&m_critical); } void CCriticalLock::unlock() { LeaveCriticalSection(&m_critical); }
main.cpp
/*多執行緒的一些總結: 一、互斥量是擁有執行緒ID的,如果互斥量沒有執行緒ID那麼當前執行緒可以獲得互斥量,互斥量函式非阻塞;否則互斥量函式將阻塞當前執行緒。 linux下條件變數初始化時是沒有繫結互斥量的(無訊號的),只要waite都會釋放當前互斥量,阻塞當前執行緒,直到有signal傳送過來才會喚醒。 window下的事件物件,事件物件無訊號情況下會阻塞當前執行緒,通過SetEvent(m_event)可以觸發事件(signal),讓當前阻塞的執行緒喚醒。 二、採用等待機制等有效的提高程式的CPU利用率,注意等待時需要先釋放所用擁有的鎖(儘管之前釋放過,),否則會導致死鎖。 s_mutexFileContent.unlock();// 不加這句linux的pthread庫會導致死鎖,linux不能遞迴加鎖否則會導致死鎖,windows下卻可以。 s_mutexRequent.unlock(); // 不加這句,windows下的WaitForSingleObject不會先釋放互斥量鎖,也會導致死鎖。 //s_mutexRequent.waite(); 三、pthread.h庫下預設建立的執行緒是可結合的,每個可結合線程都應該要麼被顯示地回收,即呼叫pthread_join; 要麼通過呼叫pthread_detach函式分離子執行緒,子執行緒被分離後不能再結合了。 pthread_detach(s_loadingThread); 四、window下的WaitForSingleObject執行緒未執行時候是未觸發的,當執行緒執行完那麼是觸發的,所以可以等到到執行緒。 //返回WAIT_OBJECT_0在指定時間內等到到,WAIT_TIMEOUT超時,WAIT_FAILED有錯誤。 HANDLE handle = CreateThread(NULL, 0, ThreadFun, NULL, 0, NULL); WaitForSingleObject(handle, INFINITE); 五、windows下眾多多執行緒函式的選擇;_beginthread是_beginthreadex的子集引數受限制,釋放會有差異,所以用_beginthreadex即可。 _beginthreadex內部會呼叫CreateThread(),會給C執行庫函式開闢堆資源,所以要用_endthreadex和CloseHandle來避免記憶體洩露。 CreateThread()沒有開闢堆資源,所以在C執行庫中可能導致多執行緒資料異常風險,但是在Win32/MFC C++執行庫中可以放心使用。 AfxBeginThread()是MFC中的多執行緒,分工作執行緒無訊息迴圈,介面執行緒有訊息迴圈,可以讓當前執行緒建立,掛起,喚醒,終止。 windows下執行緒常用函式:DWORD SuspendThread(HANDLE hThread);DWORD ResumeThread(HANDLE hThread);BOOL SetThreadPriority(HANDLE hThread,int nPriority); VOID ExitThread(DWORD dwExitCode); BOOL TerminateThread(HANDLE hThread,DWORD dwExitCode); 一般執行緒的掛起/喚醒都通過同步物件來實現。 如果在程式碼中有使用標準C執行庫中的函式時,儘量使用_beginthreadex()來代替CreateThread()。 window下的多執行緒底層都是對CreateThread的封裝。 如果在除主執行緒之外的任何執行緒中進行一下操作,你就應該使用多執行緒版本的C runtime library,並使用_beginthreadex和_endthreadex,CloseHandle: 1 使用malloc()和free(),或是new和delete 2 使用stdio.h或io.h裡面宣告的任何函式 3 使用浮點變數或浮點運算函式 4 呼叫任何一個使用了靜態緩衝區的runtime函式,比如:asctime(),strtok()或rand() 六、linux和window下互斥量和條件變數的區別 1.linux連續上鎖會死鎖,可以用 pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE_NP); 解決。windows連續上鎖不會死鎖。 2.SetEvent(m_event)後等待事件物件一直是有signal的,後面waite的不會阻塞;linux下pthread_cond_signal不會一直有訊號,後面waite的將會阻塞。 3.pthread_cond_wait()後不需要重新加鎖,WaitForSingleObject/SignalObjectAndWait後需要重新加鎖。 4.linux的pthread_cond_timedwait等待的是絕對時間1970-01-01 00:00:00 開始的時間,window的WaitForSingleObject是一個從當前開始的相對時間。 5.linux的執行緒釋放,非遊離的執行緒需要主執行緒呼叫pthread_join等待子執行緒結束並釋放子執行緒的棧暫存器,遊離的執行緒需要設定為屬性為遊離, 或者建立後用pthread_detach設定,子執行緒結束時候系統會回收子執行緒的資源。這樣才能避免記憶體洩露。 windows下的釋放:_beginthreadex建立的要呼叫_endthreadex和CloseHandle,其它方式建立的執行緒ExitThread或CloseHanle即可。 */ #include "stdafx.h" #include "MutexLock.h" #include <iostream> #include <deque> #include <string> using namespace std; #include <windows.h> // 非同步執行緒 #ifdef WINOS static HANDLE s_loadingThread = NULL; #else static pthread_t s_loadingThread; #endif // 非同步讀取檔案的互斥量 static /*CMutexLock*/CCriticalLock s_mutexRequent; // 非同步渲染的互斥量 static /*CMutexLock*/CCriticalLock s_mutexFileContent; // 根物件,派生Node class Object { public: Object() { m_dObjID = 0; } protected: double m_dObjID; }; // 非同步載入後的回撥函式 typedef void (* ObjectCallBack)(Object* ); typedef struct tagAsyncFileData { bool m_bLoadOK; Object *m_pTarget; ObjectCallBack m_pCallback; string strFilePath; tagAsyncFileData() { m_bLoadOK = false; m_pTarget = NULL; m_pCallback = NULL; strFilePath.clear(); } }AsyncFileData; static deque<AsyncFileData*> s_dataFileRequent; typedef struct tagFileBufferData { AsyncFileData *m_pAsyncFileData; char *m_pBuffer; tagFileBufferData() { m_pAsyncFileData = NULL; m_pBuffer = NULL; } }AsyncFileBufferData; static deque<AsyncFileBufferData*> s_dataFileContent; #ifdef WINOS unsigned __stdcall AsyncLoad(void *pParameter) #else static void* AsyncLoad(void *pParameter) #endif { while(1) { AsyncFileData *pFileData = NULL; s_mutexRequent.lock(); if(s_dataFileRequent.empty()) { // 如果沒有資料過來那麼釋放當前的鎖,掛起CPU等待 printf("子執行緒,因沒有請求的檔案而等待!\n"); s_mutexFileContent.unlock();// 不加這句linux的pthread庫會導致死鎖。 s_mutexRequent.unlock(); // 不加這句,windows下的WaitForSingleObject不會先釋放互斥量鎖,也會導致死鎖。 s_mutexRequent.waite(); continue; } else { pFileData = s_dataFileRequent.front(); s_dataFileRequent.pop_front(); } s_mutexRequent.unlock(); // 得到資料處理 if(pFileData != NULL) { // 非同步載入資料,此次mmap還是fread方式略去,直接設定載入OK //fopen(pFileData->strFilePath.c_str(), "rb"); Sleep(1000); pFileData->m_bLoadOK = true; //pFileData.m_pTarget AsyncFileBufferData *pAsyncBuffer = new AsyncFileBufferData; pAsyncBuffer->m_pAsyncFileData = pFileData; char *pContent = "data from pFileData's strFilePath..."; int nContenLen = strlen(pContent) + 1; pAsyncBuffer->m_pBuffer = new char[nContenLen]; strcpy_s(pAsyncBuffer->m_pBuffer, nContenLen, pContent); printf("子執行緒 讀取檔案: %s\n", pAsyncBuffer->m_pAsyncFileData->strFilePath.c_str()); // 非同步處理鎖 s_mutexFileContent.lock(); // 解析好的資料放置進來 s_dataFileContent.push_back(pAsyncBuffer); s_mutexFileContent.unlock(); s_mutexFileContent.unwaite(); } } #ifdef WINOS _endthreadex( 0 );// 釋放_beginthreadex分配的堆資源,且還要用CloseHandle釋放 return 0; #endif } int main(int argc, char* argv[]) { s_mutexRequent.init(); s_mutexFileContent.init(); #ifdef WINOS unsigned int uiThreadID; s_loadingThread = (HANDLE)_beginthreadex(NULL, 0, AsyncLoad, NULL, CREATE_SUSPENDED, &uiThreadID); /*_CRTIMP uintptr_t __cdecl _beginthreadex(_In_opt_ void * _Security, _In_ unsigned _StackSize, _In_ unsigned (__stdcall * _StartAddress) (void *), _In_opt_ void * _ArgList, _In_ unsigned _InitFlag, _In_opt_ unsigned * _ThrdAddr);*/ if(s_loadingThread == NULL) { printf("pthread_create error!"); return 0; } ResumeThread(s_loadingThread); #else if( pthread_create(&s_loadingThread, NULL, AsyncLoad, NULL) != 0) { printf("pthread_create error!"); return 0; } pthread_detach(s_loadingThread); #endif // 在任何一個時間點上,執行緒是可結合的(joinable)或者是分離的(detached)。一個可結合的執行緒能夠被其他執行緒收回其資源和殺死。 // 在被其他執行緒回收之前,它的儲存器資源(例如棧)是不釋放的。相反,一個分離的執行緒是不能被其他執行緒回收或殺死的, // 它的儲存器資源在它終止時由系統自動釋放。預設情況下,執行緒被建立成可結合的。為了避免儲存器洩漏, // 每個可結合線程都應該要麼被顯示地回收,即呼叫pthread_join;要麼通過呼叫pthread_detach函式被分離。 // pthread_detach(s_loadingThread);分離,執行結束後子執行緒會自動釋放自己資源,不需要pthread_join也可以完全釋放資源。 // void* ret = NULL; // pthread_join(_subThreadInstance, &ret);主執行緒一直等待直到等待的執行緒結束自己才結束,主執行緒可以清理其它執行緒的棧暫存器。 // pthread_self()獲取自身執行緒的id. // 執行緒的被動結束分為兩種,一種是非同步終結,另外一種是同步終結。非同步終結就是當其他執行緒呼叫 pthread_cancel的時候, // 執行緒就立刻被結束。而同步終結則不會立刻終結,它會繼續執行,直到到達下一個結束點(cancellation point)。 // 當一個執行緒被按照預設的建立方式建立,那麼它的屬性是同步終結。 static int fileCount = 0; while(1) { s_mutexRequent.lock(); AsyncFileData* m_pFileData = new AsyncFileData(); m_pFileData->m_bLoadOK = false; m_pFileData->m_pCallback = NULL; m_pFileData->m_pTarget = NULL; fileCount++; char szFileBuffer[256]; sprintf_s(szFileBuffer,"檔名 %d.", fileCount); m_pFileData->strFilePath = szFileBuffer; printf("主執行緒,請求讀取檔案: %s\n", m_pFileData->strFilePath.c_str()); s_dataFileRequent.push_back(m_pFileData); s_mutexRequent.unlock(); s_mutexRequent.unwaite(); // 其它邏輯 Sleep(1000); while(1) { AsyncFileBufferData *pAsyncBuffer = NULL; s_mutexFileContent.lock(); if(s_dataFileContent.empty()) { printf("主執行緒,因沒有解析好的資料等待!\n"); s_mutexRequent.unlock();// 請求鎖需要釋放,否則會導致問題 s_mutexFileContent.unlock(); s_mutexFileContent.waite();// 讀取執行緒還沒解析好等待 continue; } pAsyncBuffer = s_dataFileContent.front(); s_dataFileContent.pop_front(); s_mutexFileContent.unlock(); if(pAsyncBuffer != NULL) { printf("主執行緒,得到讀取執行緒解析後的檔案:%s, 資料: %s\n", pAsyncBuffer->m_pAsyncFileData->strFilePath.c_str(), pAsyncBuffer->m_pBuffer); delete pAsyncBuffer->m_pAsyncFileData; delete [] pAsyncBuffer->m_pBuffer; delete pAsyncBuffer; pAsyncBuffer = NULL; // 其它邏輯 Sleep(1000); break; } }// end while 2 } // end while 1 s_mutexRequent.release(); s_mutexFileContent.release(); #ifdef WINOS CloseHandle(s_loadingThread); #else // 設定了pthread_detach(s_loadingThread),退出時會自動釋放, // 否則需要pthread_join()等待可結合的執行緒終止被釋放它的棧暫存器資源. #endif return 0; }