1. 程式人生 > >Windows C++ 執行緒池

Windows C++ 執行緒池

        Windows 平臺C++ 沒什麼比較好的官方的執行緒池,boost裡倒是有很方便的執行緒池,可是為了一個執行緒池,使用龐大的boost顯然不太合適,為此,我在網上找了個demo,然後用C++模板做了些改造,基本上實現了類似boost的threadpool的功能。

首先需要解釋的是,以何種方式將執行緒函式加入佇列(我這裡用的佇列是CList),可以利用類模板和函式模板以及函式指標建立包含該函式的物件,執行時,只需要呼叫物件中的函式指標即可,這裡說明下,因為函式模板在例項化之前是沒有任何函式的,因此,自然就不存在指向函式模板的指標。但是可以以另一種方式建立函式模板的指標,即引入類模板。因為類模板例項化之後,其中的函式模板就有意義了。但是,這也出現了個問題,就是類模板中的函式模板無法作為函式指標在類外呼叫,例如在該標頭檔案對應的cpp檔案中呼叫。因此,不得已需要直接在標頭檔案中定義函式體。

#pragma once

#include "Task.h"
#include <typeinfo>
class ParamTem
{
protected:
	ParamTem(){};
};


template <class ReturnType,class P0=ParamTem,class P1=ParamTem,
class P2=ParamTem,class P3=ParamTem, class P4=ParamTem,
class P5=ParamTem,class P6=ParamTem, class P7=ParamTem>
class CMyTask: public CTask,public ParamTem
{
private:
	typedef ReturnType (*FuncTem)();
	typedef ReturnType (*FuncTem0)(P0 p0);
	typedef ReturnType (*FuncTem1)(P0 p0, P1 p1);
	typedef ReturnType (*FuncTem2)(P0 p0, P1 p1, P2 p2);
	typedef ReturnType (*FuncTem3)(P0 p0, P1 p1, P2 p2,P3 p3);
	typedef ReturnType (*FuncTem4)(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4);
	typedef ReturnType (*FuncTem5)(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5);
	typedef ReturnType (*FuncTem6)(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6);
	typedef ReturnType (*FuncTem7)(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6, P7 p7);
	FuncTem ft;
	FuncTem0 ft0;
	FuncTem1 ft1;
	FuncTem2 ft2;
	FuncTem3 ft3;
	FuncTem4 ft4;
	FuncTem5 ft5;
	FuncTem6 ft6;
	FuncTem7 ft7;
	P0 p0;
	P1 p1;
	P2 p2;
	P3 p3;
	P4 p4;
	P5 p5;
	P6 p6;
	P7 p7;
public:
	CMyTask(FuncTem ft)
	{
		this->ft = ft;
	}
	CMyTask(FuncTem0 ft,P0 p0)
	{
		this->ft0 = ft;
		this->p0 = p0;
	}
	CMyTask(FuncTem1 ft,P0 p0, P1 p1)
	{
		this->ft1 = ft;
		this->p0 = p0;
		this->p1 = p1;
	}
	CMyTask(FuncTem2 ft,P0 p0, P1 p1, P2 p2)
	{
		this->ft2 = ft;
		this->p0 = p0;
		this->p1 = p1;
		this->p2 = p2;
	}
	CMyTask(FuncTem3 ft,P0 p0, P1 p1, P2 p2, P3 p3)
	{
		this->ft3 = ft;
		this->p0 = p0;
		this->p1 = p1;
		this->p2 = p2;
		this->p3 = p3;
	}
	CMyTask(FuncTem4 ft,P0 p0, P1 p1, P2 p2, P3 p3,P4 p4)
	{
		this->ft4 = ft;
		this->p0 = p0;
		this->p1 = p1;
		this->p2 = p2;
		this->p3 = p3;
		this->p4 = p4;
	}
	CMyTask(FuncTem5 ft,P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5)
	{
		this->ft5 = ft;
		this->p0 = p0;
		this->p1 = p1;
		this->p2 = p2;
		this->p3 = p3;
		this->p4 = p4;
		this->p5 = p5;
	}
	CMyTask(FuncTem6 ft,P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6)
	{
		this->ft6 = ft;
		this->p0 = p0;
		this->p1 = p1;
		this->p2 = p2;
		this->p3 = p3;
		this->p4 = p4;
		this->p5 = p5;
		this->p6 = p6;
	}
	CMyTask(FuncTem7 ft,P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6, P7 p7)
	{
		this->ft7 = ft;
		this->p0 = p0;
		this->p1 = p1;
		this->p2 = p2;
		this->p3 = p3;
		this->p4 = p4;
		this->p5 = p5;
		this->p6 = p6;
		this->p7 = p7;
	}
	virtual void Run()
	{
		if(!strcmp(typeid(P0).name(),"class ParamTem"))
		{
			this->ft();
		}
		else if(!strcmp(typeid(P1).name(),"class ParamTem"))
		{
			this->ft0(this->p0);
		}
		else if(!strcmp(typeid(P2).name(),"class ParamTem"))
		{
			this->ft1(this->p0,this->p1);
		}
		else if(!strcmp(typeid(P3).name(),"class ParamTem"))
		{
			this->ft2(this->p0,this->p1,this->p2);
		}
		else if(!strcmp(typeid(P4).name(),"class ParamTem"))
		{
			this->ft3(this->p0,this->p1,this->p2,this->p3);
		}
		else if(!strcmp(typeid(P5).name(),"class ParamTem"))
		{
			this->ft4(this->p0,this->p1,this->p2,this->p3,this->p4);
		}
		else if(!strcmp(typeid(P6).name(),"class ParamTem"))
		{
			this->ft5(this->p0,this->p1,this->p2,this->p3,this->p4,this->p5);
		}
		else if(!strcmp(typeid(P7).name(),"class ParamTem"))
		{
			this->ft6(this->p0,this->p1,this->p2,this->p3,this->p4,this->p5\
				,this->p6);
		}
		else if(strcmp(typeid(P7).name(),"class ParamTem"))
		{
			this->ft7(this->p0,this->p1,this->p2,this->p3,this->p4,this->p5\
				,this->p6,this->p7);
		}
	}
};
如上只定義了到含8個引數的函式模板,如果你們需要執行的函式的引數可能更多,只需要依葫蘆畫瓢,新增相應的函式模板即可。

另外定義這個類模板是因為在將函式加入佇列貌似不太好辦,我也不確定辦不了,但是我是沒搞出來,所以需要藉助物件來儲存函式。
這裡說明兩個地方,首先為什麼要定義一個父類,而且,父類的建構函式是protected許可權的,這個為了避免其他地方呼叫這個類生成物件,因為它是MyTask的預設模板引數,預設模板引數則是為了避免參數較少時依然要填滿所以的模板引數宣告。

#include "StdAfx.h"
#include "MyThreadPool.h"

CMyThreadPool::CMyThreadPool(int nThreadCount/* =5 */)
{
	this->m_nThreadCount=nThreadCount; 
	m_bthreadflag = TRUE;
	m_hsemaphore = CreateSemaphore(NULL, 0, 5000, NULL);
	InitializeCriticalSection(&m_csThreadQueue);
	InitializeCriticalSection(&m_csTaskQueue);
	m_pThreadItemList=new std::list<ThreadItem*>;
	m_pTaskList=new std::list<CTask*>;
	EnterCriticalSection(&m_csThreadQueue);
	for (int i=0;i<m_nThreadCount;i++)
	{
		ThreadItem* pItem=new ThreadItem();
		if (pItem)
		{
			pItem->pThreadPool=this;
			pItem->bThreadFlag=TRUE;
			pItem->hThreadHandle= (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, (LPVOID)pItem, 0, &pItem->dwThreadID);
			printf("建立執行緒 threadid:%d\n",pItem->dwThreadID);
			m_pThreadItemList->push_back(pItem);
		}
	}
	LeaveCriticalSection(&m_csThreadQueue);
}

UINT WINAPI CMyThreadPool::ThreadFunc(LPVOID lpParameter /* = NULL */)
{
	//CMyThreadPool* pmyThreadPool = (CMyThreadPool*) lpParameter;
	ThreadItem *pItem = (ThreadItem*) lpParameter;
	CTask * m_pTask = NULL;
	while(pItem->bThreadFlag)
	{
		WaitForSingleObject(pItem->pThreadPool->m_hsemaphore, INFINITE);
		EnterCriticalSection(&pItem->pThreadPool->m_csTaskQueue);
		m_pTask = pItem->pThreadPool->m_pTaskList->front();
		if(m_pTask)
		{
			pItem->pThreadPool->m_pTaskList->pop_front();
		}
		LeaveCriticalSection(&pItem->pThreadPool->m_csTaskQueue);
		m_pTask->Run();
		delete m_pTask;
	}
	return NULL;
}

void CMyThreadPool::StopAllThread()
{
	EnterCriticalSection(&m_csThreadQueue);
	while (!m_pThreadItemList->empty())
	{		
		ThreadItem* pThreadItem=m_pThreadItemList->front();
		if (pThreadItem)
		{
			//TRACE("結束執行緒 %d\n",pThreadItem->dwThreadID);
			printf("結束執行緒 %d\n",pThreadItem->dwThreadID);
			pThreadItem->bThreadFlag=FALSE;
			WaitForSingleObject(pThreadItem->hThreadHandle,INFINITE);
			m_pThreadItemList->pop_front();
			delete(pThreadItem);			
		}		
	}
	LeaveCriticalSection(&m_csThreadQueue);
}


void CMyThreadPool::schedule(CTask* pTask)
{	
	EnterCriticalSection(&m_csTaskQueue);
	if (m_pTaskList)
	{
		m_pTaskList->push_back(pTask);
	}
	ReleaseSemaphore(this->m_hsemaphore,1,NULL);
	LeaveCriticalSection(&m_csTaskQueue);
}

CMyThreadPool::~CMyThreadPool()
{
	StopAllThread();
	DeleteCriticalSection(&m_csThreadQueue);
	DeleteCriticalSection(&m_csTaskQueue);
	delete m_pTaskList;
	delete m_pThreadItemList;
}
這是執行緒池原始碼,當然正如我在上面所說道的函式模板的問題,在此,其定義的另一個schedule函式需要在標頭檔案中直接定義。
#pragma once
#include "MyTask.h"
#include <list>
#include <process.h>
class CMyThreadPool;
struct ThreadItem
{
	HANDLE hThreadHandle;       //執行緒控制代碼
	UINT  dwThreadID;          //執行緒ID
	BOOL   bThreadFlag;         //執行緒執行標識
	CMyThreadPool* pThreadPool;   //屬於哪個執行緒池
	ThreadItem()
	{
		hThreadHandle=NULL;
		dwThreadID=0;
		bThreadFlag=FALSE;
		pThreadPool=NULL;
	}
};

class CMyThreadPool
{
public:
	//CMyThreadPool();
	CMyThreadPool(int nThreadCount=5);
	virtual ~CMyThreadPool();
	static UINT WINAPI ThreadFunc(LPVOID lpParameter = NULL);
public:
	void schedule(CTask* pTask);
	std::list<ThreadItem*>* GetThreadItemList() {return m_pThreadItemList;};
	std::list<CTask*>*      GetTaskList() {return m_pTaskList;};
	void               StopAllThread();
	void               AdjustSize(int nThreadCount);  //動態調整執行緒池規模
protected:
	//static 
private:
	BOOL m_bthreadflag;   
	int m_nThreadCount;    //執行緒池中執行緒的個數
	HANDLE m_hsemaphore;
	std::list<ThreadItem*>*  m_pThreadItemList;
	std::list<CTask*>*       m_pTaskList;
	CRITICAL_SECTION    m_csThreadQueue;
	CRITICAL_SECTION    m_csTaskQueue;

	//Schedule 函式定義

public:
	template<typename ReturnType>
	void schedule(ReturnType Func())
	{
		CTask * pTask = new CMyTask<ReturnType>(Func);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
public:
	template <typename ReturnType, typename P0>
	void schedule(ReturnType Func(P0 p0),P0 p0)
	{
		CTask * pTask = new CMyTask<ReturnType,P0>(Func, p0);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
	template <typename ReturnType, typename P0, typename P1>
	void schedule(ReturnType Func(P0 p0, P1 p1),P0 p0, P1 p1)
	{
		CTask * pTask = new CMyTask<ReturnType,P0,P1>(Func, p0, p1);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
	template <typename ReturnType, typename P0, typename P1,typename P2>
	void schedule(ReturnType Func(P0 p0, P1 p1, P2 p2),P0 p0, P1 p1, P2 p2)
	{
		CTask * pTask = new CMyTask<ReturnType,P0,P1,P2>(Func, p0, p1, p2);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
	template <typename ReturnType, typename P0, typename P1,typename P2, typename P3>
	void schedule(ReturnType Func(P0 p0, P1 p1, P2 p2, P3 p3),P0 p0, P1 p1, P2 p2, P3 p3)
	{
		CTask * pTask = new CMyTask<ReturnType,P0,P1,P2,P3>(Func, p0, p1, p2, p3);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
	template <typename ReturnType, typename P0, typename P1,typename P2, typename P3\
	,typename P4>
	void schedule(ReturnType Func(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4)\
	,P0 p0, P1 p1, P2 p2, P3 p3,P4 p4)
	{
		CTask * pTask = new CMyTask<ReturnType,P0,P1,P2,P3,P4>(Func, p0, p1, p2, p3, p4);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
	template <typename ReturnType, typename P0, typename P1,typename P2, typename P3\
		,typename P4, typename P5>
	void schedule(ReturnType Func(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5)\
	,P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5)
	{
		CTask * pTask = new CMyTask<ReturnType,P0,P1,P2,P3,P4,P5>(Func, p0, p1, p2, p3, p4, p5);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
	template <typename ReturnType, typename P0, typename P1,typename P2, typename P3\
	,typename P4, typename P5, typename P6>
	void schedule(ReturnType Func(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6),\
	P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6)
	{
		CTask * pTask = new CMyTask<ReturnType,P0,P1,P2,P3,P4,P5,P6>(Func, p0, p1, p2, p3, p4, p5, p6);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
	template <typename ReturnType, typename P0, typename P1,typename P2, typename P3\
	,typename P4, typename P5, typename P6,typename P7>
	void schedule(ReturnType Func(P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6, P7 p7),\
	P0 p0, P1 p1, P2 p2, P3 p3,P4 p4, P5 p5, P6 p6, P7 p7)
	{
		CTask * pTask = new CMyTask<ReturnType,P0,P1,P2,P3,P4,P5,P6,P7>(Func, p0, p1, p2, p3, p4, p5, p6,p7);
		EnterCriticalSection(&m_csTaskQueue);
		if (m_pTaskList)
		{
			m_pTaskList->push_back(pTask);
		}
		ReleaseSemaphore(this->m_hsemaphore,1,NULL);
		LeaveCriticalSection(&m_csTaskQueue);
	}
};

這裡我用到了一個訊號量和一個臨界區來保證執行緒安全,訊號量的作用是每有一個執行的物件加入佇列,就+1,反之就-1,然後利用WaitforSingleObject來讓所有執行緒等待。臨界區就不用說了,這是為了保證MyTask物件的佇列的執行緒安全的。

最後附上執行緒池原始碼下載地址http://download.csdn.net/detail/yuguanquan1990/8109379