1. 程式人生 > >C++多執行緒例項之臨界區同步

C++多執行緒例項之臨界區同步


本篇是對上一篇

進行了重構,增加了windos下的臨界區鎖。

臨界區的特點:非核心物件,只能在window下使用,linux下不能使用;只能在同一程序內的執行緒間使用,速度快。

互斥量特點:互斥量是核心物件,可以用於程序內也可以在程序間互斥,速度相對互斥量慢點,也可以解決某程序意外終止所造成的“遺棄”問題。

臨界區和互斥量都是隻能互斥,不能同步,因為它們都有“執行緒所有權”,一旦擁有可以進行多次鎖定。可以通過核心物件(linux下是條件變數)設定無訊號阻塞當前執行緒,其它執行緒觸發訊號來啟動被阻塞的執行緒,來控制執行緒的進入,這樣可以做到同步。

MutexLock.h

#ifndef _PTHREADLOCK_H_
#define _PTHREADLOCK_H_

#ifdef WIN32
#define WINOS // 這裡可以註釋掉就是linux形式的互斥量
#else
#undef  WINOS
#endif

#ifdef WINOS
#include <Windows.h>
#include <process.h> 
#else
#include <pthread.h>
//來自ftp://sources.redhat.com/pub/pthreads-win32/
/* 獲取互斥量屬性物件在程序間共享與否的標誌 
int pthread_mutexattr_getpshared (__const pthread_mutexattr_t *__restrict __attr, \  
								  int *__restrict __pshared);  
 設定互斥量屬性物件,標識在程序間共享與否
int pthread_mutexattr_setpshared (pthread_mutexattr_t *__attr, int __pshared); 
*/
/*參考文章:http://www.ibm.com/developerworks/cn/linux/l-cn-mthreadps/
http://blog.csdn.net/anonymalias/article/details/9093733
https://software.intel.com/zh-cn/blogs/2011/03/24/linux-windows
*/
#endif

class CEvent
{
public:
	CEvent();
	~CEvent();
	void init();
	void release();
	void waite();// 當獲取不到資料,那麼waite掛起執行緒,等待其它執行緒通知釋放
	void unwaite();// 生產了資料那麼需要呼叫unwaite.
private:
#ifdef WINOS
	HANDLE m_event;//事件如果有訊號那麼可以正常執行,如果無訊號那麼只能等待
#else
	pthread_cond_t m_condition;
#endif
};

// 提供了等待機制和介面的鎖,子類只需要實現自己鎖的邏輯就可以了
class ILock
{
public:
	ILock();
	virtual ~ILock();
	virtual void init() = 0;
	virtual void release() = 0;
	virtual void lock() = 0;
	virtual void unlock() = 0;
	void waite();
	void unwaite();
private:
	CEvent *m_pEvent;// 子類不用管鎖的事情
};

class CMutexLock: public ILock
{
public:
	CMutexLock();
	~CMutexLock();
	void init();
	void release();
	void lock();
	void unlock();// 設計的時候,不要unwaite放置到unlock裡面去,否則會導致職責不分明,如果有內部控制的還會導致無法喚醒。
private:
	#ifdef WINOS
		HANDLE m_mutex;
	#else
		pthread_mutex_t m_mutex;
		pthread_mutexattr_t m_mutexAttr;
	#endif
};

class CCriticalLock: public ILock
{
public:
	CCriticalLock();
	~CCriticalLock();
	void init();
	void release();
	void lock();
	void unlock();
private:
	CRITICAL_SECTION m_critical;  
};
#endif

MutexLock.cpp
#include "stdafx.h"
#include "MutexLock.h"
/*參考文章:
https://msdn.microsoft.com/en-us/library/windows/desktop/ms682411%28v=vs.85%29.aspx
http://blog.csdn.net/anonymalias/article/details/9080881
http://blog.csdn.net/anonymalias/article/details/9174403
*/

CEvent::CEvent()
{
#ifdef WINOS
	m_event = NULL;;
#else
	m_condition = NULL;
#endif
}

CEvent::~CEvent()
{

}

void CEvent::init()
{
#ifdef WINOS
	//事件是有訊號執行緒不阻塞,無訊號阻塞執行緒睡眠。
	// arg1是事件屬性。
	// arg2是手動還是自動呼叫ResetEvent將事件設定為無訊號,SetEvent是將事件設定為有訊號
	// ResetEvent是否手動設定為無訊號,WaitForSingleObject後如果是自動方式那麼會自動呼叫ResetEvent將事件設定為無訊號。
	// arg3是初始狀態訊號,一般設定為FALSE無訊號,讓執行緒掛起阻塞。
	// arg4是執行緒的名字。
	m_event = CreateEvent(NULL, FALSE, FALSE, NULL);
#else
	pthread_cond_init(&m_condition, NULL);
#endif
}

void CEvent::release()
{
#ifdef WINOS
	CloseHandle(m_event);
	m_event = NULL;
#else
	pthread_cond_destroy(&m_condition);
	m_condition = NULL;
#endif
}

void  CEvent::waite()
{
#ifdef WINOS
	WaitForSingleObject(m_event, INFINITE);// 等待的事件,和時間
#else
	//會自動呼叫pthread_mutex_unlock(&m_mutex)釋放互斥量,將當前執行緒掛起阻塞,等待對方執行緒pthread_cond_signal通知喚醒,
	// 喚醒後pthread_cond_wait會呼叫pthread_mutex_lock重新鎖定互斥量。
	// pthread_cond_timedwait是阻塞一段時間。
	pthread_cond_wait(&m_condition, &m_mutex);
#endif
}

void CEvent::unwaite()
{
#ifdef WINOS
	SetEvent(m_event);//設定為有訊號,喚醒等待事件掛起的執行緒。
#else
	pthread_cond_signal(&m_condition);//pthread_cond_broadcast(pthread_cond_t * cond)喚醒在條件上等待的所有執行緒。
#endif
}

ILock::ILock()
{
	m_pEvent = new CEvent();
	m_pEvent->init();
}
ILock::~ILock()
{
	if(m_pEvent != NULL)
	{
		m_pEvent->release();
		delete m_pEvent;
		m_pEvent = NULL;
	}
}

void ILock::waite()
{
	m_pEvent->waite();
}
void ILock::unwaite()
{
	m_pEvent->unwaite();
}


CMutexLock::CMutexLock()
{
	m_mutex = NULL;
#ifndef WINOS
	m_mutexAttr == NULL;
#endif
}

CMutexLock::~CMutexLock()
{
	release();
}
void CMutexLock::release()
{
#ifdef WINOS
	if(m_mutex != NULL)
	{
		CloseHandle(m_mutex);//所有核心物件,或者用其它方式建立的,都可以用closeHandle將引用計數減1。
		m_mutex = NULL;
	}

#else
	if(m_mutexAttr != NULL)
	{
		pthread_mutexattr_destroy(&m_mutexAttr);
		m_mutexAttr = NULL;
	}

	if(m_mutex != NULL)
	{
		pthread_mutex_destroy(&m_mutex);
		m_mutex = NULL;
	}
	
#endif
}

void CMutexLock::init()
{
#ifdef WINOS
	// arg1 是NULL,互斥量用預設的安全描述資訊,這個時候子程序不能繼承該互斥量.
	// arg2 是當前指明互斥量指向的執行緒為空,且被引用的次數是0,沒有執行緒/程序擁有該互斥量;否則當前執行緒擁有該互斥量。
	// arg3 互斥量的名字
	m_mutex = CreateMutex(NULL, FALSE, NULL);
	DWORD dwLastError = GetLastError();
	if( dwLastError == ERROR_ALREADY_EXISTS)
	{
		CloseHandle(m_mutex);
		m_mutex = NULL;
	}
#else
	// arg1是初始化的互斥量,arg2是pthread_mutexattr_t屬性指標,如果是NULL,那麼沒有執行緒擁有該初始化好的互斥量。
	int nResult = pthread_mutex_init(&m_mutex, NULL);
	if(nResult == 0)
	 {
		 printf("pthread_mutex_init result OK.\n");
	 }
	 else
	 {
		 printf("pthread_mutex_init result error:%d\n", nResult);
	 }
	pthread_mutexattr_init(&m_mutexAttr); 
	// 設定 recursive 屬性,使得linux下可以遞迴加鎖,避免遞迴加鎖死鎖。
	pthread_mutexattr_settype(&m_mutexAttr,PTHREAD_MUTEX_RECURSIVE_NP); 
	pthread_mutex_init(&m_mutex, &m_mutexAttr);
#endif
}

void CMutexLock::lock()
{
#ifdef WINOS
	// arg2是等待毫秒時間,INFINITE是永遠等待,直到該核心物件被觸發可用;該函式是一個非同步呼叫函式,互斥量擁有執行緒id非0,
	// 那麼該函式將被掛起阻塞,釋放當前CPU擁有權,當被其它執行緒釋放互斥量擁有執行緒id為0,將會喚醒當前阻塞的執行緒重新獲取互斥量。
	WaitForSingleObject(m_mutex, INFINITE);
	/*if(WaiteforSingleObject(m_hMutex, dwMilliSec) == WAIT_OBJECT_0)
	{
		return true;
	}
	return false;
	*/
#else
	// 鎖定互斥鎖,如果該互斥鎖被其它執行緒擁有,那麼將被掛起阻塞,指定可用才回調返回;
	// 執行緒自己多次鎖定將會導致死鎖;兩個執行緒需要多個互斥鎖相互等待對方的互斥鎖,也會導致死鎖。
	pthread_mutex_lock(&m_mutex); 
#endif
}

void CMutexLock::unlock()
{
#ifdef WINOS
	ReleaseMutex(m_mutex);// 將互斥量釋放,會通知到WaitForSingleObject.
#else
	pthread_mutex_unlock(&m_mutex);
#endif
}

CCriticalLock::CCriticalLock()
{
}

CCriticalLock::~CCriticalLock()
{
}

void CCriticalLock::init()
{
	InitializeCriticalSection(&m_critical);  
}

void CCriticalLock::release()
{
	DeleteCriticalSection(&m_critical); 
}


void CCriticalLock::lock()
{
	EnterCriticalSection(&m_critical); 
}

void CCriticalLock::unlock()
{
	LeaveCriticalSection(&m_critical); 
}

main.cpp
/*多執行緒的一些總結:
一、互斥量是擁有執行緒ID的,如果互斥量沒有執行緒ID那麼當前執行緒可以獲得互斥量,互斥量函式非阻塞;否則互斥量函式將阻塞當前執行緒。
    linux下條件變數初始化時是沒有繫結互斥量的(無訊號的),只要waite都會釋放當前互斥量,阻塞當前執行緒,直到有signal傳送過來才會喚醒。
	window下的事件物件,事件物件無訊號情況下會阻塞當前執行緒,通過SetEvent(m_event)可以觸發事件(signal),讓當前阻塞的執行緒喚醒。
二、採用等待機制等有效的提高程式的CPU利用率,注意等待時需要先釋放所用擁有的鎖(儘管之前釋放過,),否則會導致死鎖。
s_mutexFileContent.unlock();// 不加這句linux的pthread庫會導致死鎖,linux不能遞迴加鎖否則會導致死鎖,windows下卻可以。
s_mutexRequent.unlock(); // 不加這句,windows下的WaitForSingleObject不會先釋放互斥量鎖,也會導致死鎖。

//s_mutexRequent.waite();
三、pthread.h庫下預設建立的執行緒是可結合的,每個可結合線程都應該要麼被顯示地回收,即呼叫pthread_join;
要麼通過呼叫pthread_detach函式分離子執行緒,子執行緒被分離後不能再結合了。
pthread_detach(s_loadingThread);

四、window下的WaitForSingleObject執行緒未執行時候是未觸發的,當執行緒執行完那麼是觸發的,所以可以等到到執行緒。
//返回WAIT_OBJECT_0在指定時間內等到到,WAIT_TIMEOUT超時,WAIT_FAILED有錯誤。
HANDLE handle = CreateThread(NULL, 0, ThreadFun, NULL, 0, NULL);  
WaitForSingleObject(handle, INFINITE);
五、windows下眾多多執行緒函式的選擇;_beginthread是_beginthreadex的子集引數受限制,釋放會有差異,所以用_beginthreadex即可。
_beginthreadex內部會呼叫CreateThread(),會給C執行庫函式開闢堆資源,所以要用_endthreadex和CloseHandle來避免記憶體洩露。
CreateThread()沒有開闢堆資源,所以在C執行庫中可能導致多執行緒資料異常風險,但是在Win32/MFC C++執行庫中可以放心使用。
AfxBeginThread()是MFC中的多執行緒,分工作執行緒無訊息迴圈,介面執行緒有訊息迴圈,可以讓當前執行緒建立,掛起,喚醒,終止。
windows下執行緒常用函式:DWORD SuspendThread(HANDLE hThread);DWORD ResumeThread(HANDLE hThread);BOOL SetThreadPriority(HANDLE hThread,int nPriority);
VOID ExitThread(DWORD dwExitCode); BOOL TerminateThread(HANDLE hThread,DWORD dwExitCode); 
一般執行緒的掛起/喚醒都通過同步物件來實現。

如果在程式碼中有使用標準C執行庫中的函式時,儘量使用_beginthreadex()來代替CreateThread()。
window下的多執行緒底層都是對CreateThread的封裝。
如果在除主執行緒之外的任何執行緒中進行一下操作,你就應該使用多執行緒版本的C runtime library,並使用_beginthreadex和_endthreadex,CloseHandle:
1 使用malloc()和free(),或是new和delete
2 使用stdio.h或io.h裡面宣告的任何函式
3 使用浮點變數或浮點運算函式
4 呼叫任何一個使用了靜態緩衝區的runtime函式,比如:asctime(),strtok()或rand()

六、linux和window下互斥量和條件變數的區別
1.linux連續上鎖會死鎖,可以用 pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE_NP); 解決。windows連續上鎖不會死鎖。
2.SetEvent(m_event)後等待事件物件一直是有signal的,後面waite的不會阻塞;linux下pthread_cond_signal不會一直有訊號,後面waite的將會阻塞。
3.pthread_cond_wait()後不需要重新加鎖,WaitForSingleObject/SignalObjectAndWait後需要重新加鎖。
4.linux的pthread_cond_timedwait等待的是絕對時間1970-01-01 00:00:00 開始的時間,window的WaitForSingleObject是一個從當前開始的相對時間。 
5.linux的執行緒釋放,非遊離的執行緒需要主執行緒呼叫pthread_join等待子執行緒結束並釋放子執行緒的棧暫存器,遊離的執行緒需要設定為屬性為遊離,
或者建立後用pthread_detach設定,子執行緒結束時候系統會回收子執行緒的資源。這樣才能避免記憶體洩露。
windows下的釋放:_beginthreadex建立的要呼叫_endthreadex和CloseHandle,其它方式建立的執行緒ExitThread或CloseHanle即可。
*/
#include "stdafx.h"
#include "MutexLock.h"
#include <iostream>
#include <deque>
#include <string>
using namespace std;
#include <windows.h>

// 非同步執行緒
#ifdef WINOS
static HANDLE s_loadingThread = NULL;
#else
static pthread_t s_loadingThread;
#endif
// 非同步讀取檔案的互斥量
static /*CMutexLock*/CCriticalLock s_mutexRequent;
// 非同步渲染的互斥量
static /*CMutexLock*/CCriticalLock s_mutexFileContent;

// 根物件,派生Node
class Object
{
public:
	Object()
	{
		m_dObjID = 0;
	}
protected:
	double m_dObjID;
};
// 非同步載入後的回撥函式
typedef  void (* ObjectCallBack)(Object* );

typedef struct tagAsyncFileData
{
	bool m_bLoadOK;
	Object *m_pTarget;
	ObjectCallBack m_pCallback;
	string strFilePath;
	tagAsyncFileData()
	{
		m_bLoadOK = false;
		m_pTarget = NULL;
		m_pCallback = NULL;
		strFilePath.clear();
	}
}AsyncFileData;

static deque<AsyncFileData*> s_dataFileRequent;

typedef struct tagFileBufferData
{
	AsyncFileData *m_pAsyncFileData;
	char *m_pBuffer;
	tagFileBufferData()
	{
		m_pAsyncFileData = NULL;
		m_pBuffer = NULL;
	}
}AsyncFileBufferData;

static deque<AsyncFileBufferData*> s_dataFileContent;

#ifdef WINOS
unsigned __stdcall AsyncLoad(void *pParameter)  
#else
static void* AsyncLoad(void *pParameter)
#endif
{
	while(1)
	{
		AsyncFileData *pFileData = NULL;
		s_mutexRequent.lock();
		if(s_dataFileRequent.empty())
		{
			// 如果沒有資料過來那麼釋放當前的鎖,掛起CPU等待
			printf("子執行緒,因沒有請求的檔案而等待!\n");
			s_mutexFileContent.unlock();// 不加這句linux的pthread庫會導致死鎖。
			s_mutexRequent.unlock(); // 不加這句,windows下的WaitForSingleObject不會先釋放互斥量鎖,也會導致死鎖。
			s_mutexRequent.waite();
			
			continue;
		}
		else
		{
			pFileData = s_dataFileRequent.front();
			s_dataFileRequent.pop_front();
		}
		s_mutexRequent.unlock();
		

		// 得到資料處理
		if(pFileData != NULL)
		{
			// 非同步載入資料,此次mmap還是fread方式略去,直接設定載入OK
			//fopen(pFileData->strFilePath.c_str(), "rb");
			Sleep(1000);
			pFileData->m_bLoadOK = true;
			//pFileData.m_pTarget
			AsyncFileBufferData *pAsyncBuffer  = new AsyncFileBufferData;
			pAsyncBuffer->m_pAsyncFileData = pFileData;
			char *pContent = "data from pFileData's strFilePath...";
			int nContenLen = strlen(pContent) + 1;
			pAsyncBuffer->m_pBuffer = new char[nContenLen];
			strcpy_s(pAsyncBuffer->m_pBuffer, nContenLen, pContent);
			printf("子執行緒 讀取檔案: %s\n", pAsyncBuffer->m_pAsyncFileData->strFilePath.c_str());
			
			// 非同步處理鎖
			s_mutexFileContent.lock();
			// 解析好的資料放置進來
			s_dataFileContent.push_back(pAsyncBuffer);
			s_mutexFileContent.unlock();
			s_mutexFileContent.unwaite();
		}
	}
#ifdef WINOS
	_endthreadex( 0 );// 釋放_beginthreadex分配的堆資源,且還要用CloseHandle釋放
	return 0;
#endif
}

int main(int argc, char* argv[])  
{ 
	s_mutexRequent.init();
	s_mutexFileContent.init();
#ifdef WINOS
	unsigned int uiThreadID;
	s_loadingThread = (HANDLE)_beginthreadex(NULL, 0, AsyncLoad, NULL, CREATE_SUSPENDED, &uiThreadID); 
	/*_CRTIMP uintptr_t __cdecl _beginthreadex(_In_opt_ void * _Security, _In_ unsigned _StackSize,
		_In_ unsigned (__stdcall * _StartAddress) (void *), _In_opt_ void * _ArgList, 
		_In_ unsigned _InitFlag, _In_opt_ unsigned * _ThrdAddr);*/
	if(s_loadingThread == NULL)
	{
		printf("pthread_create error!");
		return 0;
	}
	ResumeThread(s_loadingThread); 
#else
	if( pthread_create(&s_loadingThread, NULL, AsyncLoad, NULL) != 0)
	{
		printf("pthread_create error!");
		return 0;
	}
	pthread_detach(s_loadingThread);
#endif
	// 在任何一個時間點上,執行緒是可結合的(joinable)或者是分離的(detached)。一個可結合的執行緒能夠被其他執行緒收回其資源和殺死。
	// 在被其他執行緒回收之前,它的儲存器資源(例如棧)是不釋放的。相反,一個分離的執行緒是不能被其他執行緒回收或殺死的,
	// 它的儲存器資源在它終止時由系統自動釋放。預設情況下,執行緒被建立成可結合的。為了避免儲存器洩漏,
	// 每個可結合線程都應該要麼被顯示地回收,即呼叫pthread_join;要麼通過呼叫pthread_detach函式被分離。

	// pthread_detach(s_loadingThread);分離,執行結束後子執行緒會自動釋放自己資源,不需要pthread_join也可以完全釋放資源。
	//   void* ret = NULL;
	// pthread_join(_subThreadInstance, &ret);主執行緒一直等待直到等待的執行緒結束自己才結束,主執行緒可以清理其它執行緒的棧暫存器。
	// pthread_self()獲取自身執行緒的id.
	// 執行緒的被動結束分為兩種,一種是非同步終結,另外一種是同步終結。非同步終結就是當其他執行緒呼叫 pthread_cancel的時候,
	// 執行緒就立刻被結束。而同步終結則不會立刻終結,它會繼續執行,直到到達下一個結束點(cancellation point)。
	// 當一個執行緒被按照預設的建立方式建立,那麼它的屬性是同步終結。
	static int fileCount = 0;
	while(1)
	{
		s_mutexRequent.lock();
		AsyncFileData* m_pFileData = new AsyncFileData();
		m_pFileData->m_bLoadOK = false;
		m_pFileData->m_pCallback = NULL;
		m_pFileData->m_pTarget = NULL;
		fileCount++;
		char szFileBuffer[256];
		sprintf_s(szFileBuffer,"檔名 %d.", fileCount);
		m_pFileData->strFilePath = szFileBuffer;
		printf("主執行緒,請求讀取檔案: %s\n", m_pFileData->strFilePath.c_str());
		s_dataFileRequent.push_back(m_pFileData);
		s_mutexRequent.unlock();
		s_mutexRequent.unwaite();

		// 其它邏輯
		Sleep(1000);

		while(1)
		{
			AsyncFileBufferData *pAsyncBuffer = NULL;

			s_mutexFileContent.lock();
			if(s_dataFileContent.empty())
			{
				printf("主執行緒,因沒有解析好的資料等待!\n");
				s_mutexRequent.unlock();// 請求鎖需要釋放,否則會導致問題
				s_mutexFileContent.unlock();
				s_mutexFileContent.waite();// 讀取執行緒還沒解析好等待
				continue;
			}

			pAsyncBuffer = s_dataFileContent.front();
			s_dataFileContent.pop_front();
			s_mutexFileContent.unlock();

			if(pAsyncBuffer != NULL)
			{
				printf("主執行緒,得到讀取執行緒解析後的檔案:%s, 資料: %s\n", pAsyncBuffer->m_pAsyncFileData->strFilePath.c_str(), pAsyncBuffer->m_pBuffer);
				delete pAsyncBuffer->m_pAsyncFileData;
				delete [] pAsyncBuffer->m_pBuffer;
				delete pAsyncBuffer;
				pAsyncBuffer = NULL;

				// 其它邏輯
				Sleep(1000);
				break;
			}	
		}// end while 2
	
	} //  end while 1

	s_mutexRequent.release();
	s_mutexFileContent.release();
#ifdef WINOS
	CloseHandle(s_loadingThread);
#else
	// 設定了pthread_detach(s_loadingThread),退出時會自動釋放,
	// 否則需要pthread_join()等待可結合的執行緒終止被釋放它的棧暫存器資源.
#endif
	return 0;  
}