1. 程式人生 > >多執行緒應用例子2—消費者,生產者,多個緩衝區

多執行緒應用例子2—消費者,生產者,多個緩衝區

將消費者改成2個,緩衝池改成擁有4個緩衝區的大緩衝池。

用二個訊號量就可以解決這種緩衝池有多個緩衝區的情況——用一個訊號量A來記錄為空的緩衝區個數,另一個訊號量B記錄非空的緩衝區個數,然後生產者等待訊號量A,消費者等待訊號量B就可以了。因此可以仿照上面的程式碼來實現複雜生產者消費者問題,示例程式碼如下:

1、臨界區:通過對多執行緒的序列化來訪問公共資源或一段程式碼,速度快,適合控制資料訪問。在任意時刻只允許一個執行緒對共享資源進行訪問,如果有多個執行緒試圖訪問公共資源,那麼在有一個執行緒進入後,其他試圖訪問公共資源的執行緒將被掛起,並一直等到進入臨界區的執行緒離開,臨界區在被釋放後,其他執行緒才可以搶佔。 
2、互斥量:採用互斥物件機制。 只有擁有互斥物件的執行緒才有訪問公共資源的許可權,因為互斥物件只有一個,所以能保證公共資源不會同時被多個執行緒訪問。互斥不僅能實現同一應用程式的公共資源安全共享,還能實現不同應用程式的公共資源安全共享 
3、訊號量:
它允許多個執行緒在同一時刻訪問同一資源,但是需要限制在同一時刻訪問此資源的最大執行緒數目 4、事 件: 通過通知操作的方式來保持執行緒的同步,還可以方便實現對多個執行緒的優先順序比較的操作


//1生產者 2消費者 4緩衝區 

#include <stdio.h>
#include <process.h>
#include <windows.h>

//<利用訊號量和關鍵段進行處理
CRITICAL_SECTION g_cs;
HANDLE g_hSemaphoreBufferEmpty, g_hSemaphoreBufferFull;  
const int END_PRODUCE_NUMBER = 10;   //<生產產品個數  
const int BUFFER_SIZE = 4;          //<緩衝區個數  
int g_Buffer[BUFFER_SIZE];          //<緩衝池  
int g_i,g_j;

unsigned int _stdcall ProducerThread(PVOID pM)
{
	for (int i = 1; i<= END_PRODUCE_NUMBER; i++)
	{
		//<等待有空的緩衝區出現  
		WaitForSingleObject(g_hSemaphoreBufferEmpty,INFINITE);  //<4個緩衝區,等待緩衝區為空,只有等到相應的事件觸發了之後,才能進行處理

		//<互斥的訪問緩衝區  
		EnterCriticalSection(&g_cs);
		g_Buffer[g_i] = i;  
		printf("生產者在緩衝池第%d個緩衝區中投放資料%d\n", g_i, g_Buffer[g_i]);
		g_i = (g_i + 1) % BUFFER_SIZE; //<4個緩衝區中迴圈放置  

		LeaveCriticalSection(&g_cs);

		//<通知消費者有新資料了  
		ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL); 
	}
	printf("生產者完成任務,執行緒結束執行\n");  
	return 0;
}

unsigned int _stdcall ConsumerThread(PVOID pM) 
{
	while (true)
	{
		//<等待非空的緩衝區出現
		WaitForSingleObject(g_hSemaphoreBufferFull,INFINITE);

		EnterCriticalSection(&g_cs);
		printf("  編號為%d的消費者從緩衝池中第%d個緩衝區取出資料%d\n", GetCurrentThreadId(), g_j, g_Buffer[g_j]);
		if (g_Buffer[g_j] == END_PRODUCE_NUMBER)//<結束標誌  
		{  
			LeaveCriticalSection(&g_cs);  
			//<通知其它消費者有新資料了(結束標誌)  
			ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);  
			break;  
		}  
		g_j = (g_j + 1) % BUFFER_SIZE;  
		LeaveCriticalSection(&g_cs);  

		Sleep(50); //some other work to do  

		ReleaseSemaphore(g_hSemaphoreBufferEmpty, 1, NULL);  
	}
	printf("  編號為%d的消費者收到通知,執行緒結束執行\n", GetCurrentThreadId());  
	return 0;
}
int main()
{
	printf("  生產者消費者問題   1生產者 2消費者 4緩衝區\n");   

	InitializeCriticalSection(&g_cs);  
	//初始化訊號量,一個記錄有產品的緩衝區個數,另一個記錄空緩衝區個數.  
	g_hSemaphoreBufferEmpty = CreateSemaphore(NULL, 4, 4, NULL);  
	g_hSemaphoreBufferFull  = CreateSemaphore(NULL, 0, 4, NULL);  
	g_i = 0;  
	g_j = 0;  
	memset(g_Buffer, 0, sizeof(g_Buffer));  

	const int THREADNUM = 3;  
	HANDLE hThread[THREADNUM];  
	//生產者執行緒  
	hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThread, NULL, 0, NULL);  
	//消費者執行緒  
	hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThread, NULL, 0, NULL);  
	hThread[2] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThread, NULL, 0, NULL);  
	WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);  
	for (int i = 0; i < THREADNUM; i++)  
		CloseHandle(hThread[i]);  

	//銷燬訊號量和關鍵段  
	CloseHandle(g_hSemaphoreBufferEmpty);  
	CloseHandle(g_hSemaphoreBufferFull);  
	DeleteCriticalSection(&g_cs);  
	return 0;  
}