1. 程式人生 > >c語言通過cond和mutex實現一個高效能執行緒池

c語言通過cond和mutex實現一個高效能執行緒池

首先說明一下這篇文章其實和我另一篇文章很像,只是另一篇只有一個執行緒,只要講述的是cond和mutex的使用,上面是基於c++實現的,封裝的稍微好一些,路徑:

https://blog.csdn.net/FlayHigherGT/article/details/83830956

而下面是基於c實現的執行緒池,模仿大神寫的,沒用任何c++的特性以及庫,通過連結串列裝載任務,程式碼中的註釋以及很明確了,歡迎大家一起來學習。

#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>

/*採用連結串列裝載任務*/
typedef struct STask
{
	struct STask *pNext;
	int nValue;
}*TASK;

struct SThreadInfo 
{
	int bThreadRunning;
	int nThreadNum;
	int nTaskNum;
	TASK pTask;

	/*這個指標裡面儲存n個執行緒控制代碼(n為執行緒池大小)*/
	pthread_t *pThreadId;

	pthread_mutex_t hMutex;
	pthread_cond_t hCond;
};

struct SThreadInfo g_sThreadInfo;

/*初始化執行緒池*/
static void ThreadPoolInit(int nThreadNum);
/*執行緒池執行緒的執行函式*/
static void *ThreadPoolRoutine(void *ThreadParam);
/*執行緒池做連結串列中的某一個任務*/
static void ThreadPoolDoTask(TASK pTask);
/*取出連結串列中的頭一個任務*/
static TASK ThreadPoolRetrieveTask();
/*銷燬執行緒池*/
static void ThreadPoolDestroy();

static void ThreadPoolInit(int nThreadNum)
{
	/*執行緒池預設是個執行緒*/
	if (nThreadNum <= 0)
		nThreadNum = 10;

	pthread_mutex_init(&g_sThreadInfo.hMutex, NULL);
	pthread_cond_init(&g_sThreadInfo.hCond, NULL);

	g_sThreadInfo.nThreadNum = nThreadNum;
	g_sThreadInfo.bThreadRunning = 1;
	g_sThreadInfo.nTaskNum = 0;

	/*給每個handle分配空間*/
	g_sThreadInfo.pThreadId = (pthread_t*)malloc(sizeof(pthread_t*) * nThreadNum);

	/*建立n個執行緒*/
	for (int i = 0; i < nThreadNum; ++i)
		pthread_create(&g_sThreadInfo.pThreadId[i], NULL, ThreadPoolRoutine, NULL);
}

/*取任務,取連結串列頭*/
static TASK ThreadPoolRetrieveTask()
{
	TASK pHead = g_sThreadInfo.pTask;
	if (pHead)
	{
		g_sThreadInfo.pTask = pHead->pNext;
		--g_sThreadInfo.nTaskNum;
		printf("retrieve a task, task value is [%d]\n", pHead->nValue);
		return pHead;
	}

	printf("no task\n");
	return NULL;
}

static void ThreadPoolDoTask(TASK pTask)
{
	if (!pTask)
		return;

	/*此時可以做任務,睡一秒模擬做任務*/
	sleep(1);

	printf("task value is [%d]\n", pTask->nValue);

	free(pTask);
}

static void ThreadPoolAddTask(TASK pTask)
{
	if (!pTask)
		return;

	pthread_mutex_lock(&g_sThreadInfo.hMutex);

	TASK pHead = g_sThreadInfo.pTask;

	if (!pHead)
	{
		g_sThreadInfo.pTask = pTask;
	} else
	{
		while (pHead->pNext != NULL)
		{
			pHead = pHead->pNext;
		}
		pHead->pNext = pTask;
	}

	++g_sThreadInfo.nTaskNum;

	/*關於先解鎖還是先發訊號,貌似先發訊號的說法多一點,這裡採用先發訊號*/
	pthread_cond_signal(&g_sThreadInfo.hCond);
	pthread_mutex_unlock(&g_sThreadInfo.hMutex);
}

static void *ThreadPoolRoutine(void *ThreadParam)
{
	printf("thread NO.%d start\n", (int)pthread_self());

	while (g_sThreadInfo.bThreadRunning)
	{
		TASK pCurrentTask = NULL;

		pthread_mutex_lock(&g_sThreadInfo.hMutex);

		while (g_sThreadInfo.nTaskNum <= 0)
		{
			pthread_cond_wait(&g_sThreadInfo.hCond, &g_sThreadInfo.hMutex);

			/*收到signal訊號之後,第一個能執行下去的也即搶到鎖的,
			 *搶到鎖之後其他執行緒會在此處卡死,本執行緒執行如果執行緒
			 *池收到結束訊號,本執行緒會退出拿任務執行,並結束。第
			 *一個拿到鎖的本執行緒nTaskNum肯定是大於0的。解鎖之後,
			 *其他執行緒誰先搶到鎖誰就退出,但是貌似會執行空任務。
			 */
			if (!g_sThreadInfo.bThreadRunning)
				break;
		}

		/*本執行緒第一個搶到鎖,正常執行任務解鎖,第二個搶到鎖的執行緒
		 *發現nTaskNum是空的,那當然還是在while迴圈裡面,繼續wait,
		 *這邊就能體現為啥一定要用while而不是if的真正原因了。
		 */
		if (g_sThreadInfo.nTaskNum > 0)
		{
			pCurrentTask = ThreadPoolRetrieveTask();

			pthread_mutex_unlock(&g_sThreadInfo.hMutex);

			ThreadPoolDoTask(pCurrentTask);
		} else
		{
			pthread_mutex_unlock(&g_sThreadInfo.hMutex);
		}
	}

	printf("thread NO.%d exit\n", (int)pthread_self());
}


static void ThreadPoolDestroy()
{
	g_sThreadInfo.bThreadRunning = 0;

	/*這個不需要加鎖,因為這個一旦執行,所有的wait都不會等待了,
	 *即使執行的時候不在wait,之後繼續wait也不會等待了
	 */
	pthread_cond_broadcast(&g_sThreadInfo.hCond);

	for (int i = 0; i < g_sThreadInfo.nThreadNum; ++i)
		pthread_join(g_sThreadInfo.pThreadId[i], NULL);

	free(g_sThreadInfo.pThreadId);

	pthread_mutex_destroy(&g_sThreadInfo.hMutex);
	pthread_cond_destroy(&g_sThreadInfo.hCond);
}

int main(int argc, char* argv[])
{
	/*初始化執行緒池*/
	ThreadPoolInit(10);
	
	/*新增任務*/
	TASK pTask = NULL;
	for (int i = 0; i < 100; ++i)
	{
		pTask = (TASK)malloc(sizeof(struct STask));
		pTask->nValue = i + 1;
		pTask->pNext = NULL;
		printf("add task, task value [%d]\n", pTask->nValue);
		ThreadPoolAddTask(pTask);
	}

	/*銷燬執行緒池*/
	ThreadPoolDestroy();

	return 0;
}