1. 程式人生 > >執行緒池實現處理多個連線

執行緒池實現處理多個連線

#include <stdlib.h>


#include <winsock2.h> // initsock.h 檔案
#pragma comment(lib, "WS2_32") // 連結到WS2_32.lib
class CInitSock
{
public:
CInitSock(BYTE minorVer = 2, BYTE majorVer = 2)

// 初始化WS2_32.dll
WSADATA wsaData;
WORD sockVersion = MAKEWORD(minorVer, majorVer);
if(::WSAStartup(sockVersion, &wsaData) != 0)

exit(0); 
}
}


~CInitSock()

::WSACleanup(); 
}
};


CInitSock initsock;




typedef struct _SOCKET_OBJ
{
SOCKET s; // 套接字控制代碼
HANDLE event; // 與此套接字相關聯的事件物件控制代碼
sockaddr_in addrRemote;// 客戶端地址資訊
_SOCKET_OBJ *pNext;// 指向下一個SOCKET_OBJ 物件,以連成一個表
} SOCKET_OBJ, *PSOCKET_OBJ;


PSOCKET_OBJ GetSocketObj(SOCKET s) // 申請一個套接字物件,初始化它的成員
{
PSOCKET_OBJ pSocket = (PSOCKET_OBJ)::GlobalAlloc(GPTR, sizeof(SOCKET_OBJ));
if(pSocket != NULL)

pSocket->s = s;
pSocket->event = ::WSACreateEvent();
}
return pSocket;
}


void FreeSocketObj(PSOCKET_OBJ pSocket) // 釋放一個套接字物件
{
::CloseHandle(pSocket->event);
if(pSocket->s != INVALID_SOCKET)

::closesocket(pSocket->s); 
}
::GlobalFree(pSocket);
}


typedef struct _THREAD_OBJ
{
HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];// 記錄當前執行緒要等待的事件物件的控制代碼
int nSocketCount;// 記錄當前執行緒處理的套接字的數量 <= WSA_MAXIMUM_WAIT_EVENTS
PSOCKET_OBJ pSockHeader;// 當前執行緒處理的套接字物件列表,pSockHeader 指向表頭
PSOCKET_OBJ pSockTail;// pSockTail 指向表尾
CRITICAL_SECTION cs;// 關鍵程式碼段變數,為的是同步對本結構的訪問
_THREAD_OBJ *pNext;// 指向下一個THREAD_OBJ 物件,為的是連成一個表
} THREAD_OBJ, *PTHREAD_OBJ;


PTHREAD_OBJ g_pThreadList;// 指向執行緒物件列表表頭
CRITICAL_SECTION g_cs;// 同步對此全域性變數的訪問


PTHREAD_OBJ GetThreadObj() // 申請一個執行緒物件,初始化它的成員,並將它新增到執行緒物件列表中
{
PTHREAD_OBJ pThread = (PTHREAD_OBJ)::GlobalAlloc(GPTR, sizeof(THREAD_OBJ));
if(pThread != NULL)

::InitializeCriticalSection(&pThread->cs);


// 建立一個事件物件,用於指示該執行緒的控制代碼陣列需要重建
pThread->events[0] = ::WSACreateEvent();
// 將新申請的執行緒物件新增到列表中
::EnterCriticalSection(&g_cs);


pThread->pNext = g_pThreadList;


g_pThreadList = pThread;
::LeaveCriticalSection(&g_cs);
}
return pThread;
}


void FreeThreadObj(PTHREAD_OBJ pThread) // 釋放一個執行緒物件,並將它從執行緒物件列表中移除
{
// 線上程物件列表中查詢pThread 所指的物件,如果找到就從中移除
::EnterCriticalSection(&g_cs);
PTHREAD_OBJ p = g_pThreadList;
if(p == pThread) // 是第一個?

g_pThreadList = p->pNext; 
}
else

while(p != NULL && p->pNext != pThread)

p = p->pNext; 
}
if(p != NULL)

// 此時,p 是pThread 的前一個,即“p->pNext == pThread”
p->pNext = pThread->pNext;
}
}
::LeaveCriticalSection(&g_cs);
// 釋放資源
::CloseHandle(pThread->events[0]);
::DeleteCriticalSection(&pThread->cs);
::GlobalFree(pThread);
}


void RebuildArray(PTHREAD_OBJ pThread) // 重新建立執行緒物件的events 陣列
{
::EnterCriticalSection(&pThread->cs);
PSOCKET_OBJ pSocket = pThread->pSockHeader;
int n = 1; // 從第1 個開始寫,第0 個用於指示需要重建了
while(pSocket != NULL)
{
pThread->events[n++] = pSocket->event;
pSocket = pSocket->pNext;
}
::LeaveCriticalSection(&pThread->cs);
}


LONG g_nTatolConnections;// 總共連線數量
LONG g_nCurrentConnections;// 當前連線數量


// 向一個執行緒的套接字列表中插入一個套接字
BOOL InsertSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
BOOL bRet = FALSE;
::EnterCriticalSection(&pThread->cs);
if(pThread->nSocketCount < WSA_MAXIMUM_WAIT_EVENTS - 1)
{
if(pThread->pSockHeader == NULL)

pThread->pSockHeader = pThread->pSockTail = pSocket; 
}
else

pThread->pSockTail->pNext = pSocket;
pThread->pSockTail = pSocket;
}
pThread->nSocketCount ++;
bRet = TRUE;
}
::LeaveCriticalSection(&pThread->cs);


// 插入成功,說明成功處理了客戶的連線請求
if(bRet)

::InterlockedIncrement(&g_nTatolConnections);
::InterlockedIncrement(&g_nCurrentConnections);
}
return bRet;
}


PSOCKET_OBJ FindSocketObj(PTHREAD_OBJ pThread, int nIndex) // nIndex 從1 開始

// 在套接字列表中查詢
PSOCKET_OBJ pSocket = pThread->pSockHeader;
while(--nIndex)
{
if(pSocket == NULL)
return NULL;
pSocket = pSocket->pNext;
}
return pSocket;
}


void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket);


BOOL HandleIO(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)

// 獲取具體發生的網路事件
WSANETWORKEVENTS event;
::WSAEnumNetworkEvents(pSocket->s, pSocket->event, &event);
do

if(event.lNetworkEvents & FD_READ) // 套接字可讀
{
if(event.iErrorCode[FD_READ_BIT] == 0)

char szText[256];
int nRecv = ::recv(pSocket->s, szText, strlen(szText), 0);
if(nRecv > 0)

szText[nRecv] = '\0';
printf("接收到資料:%s \n", szText);
}
}
else
break;
}
else if(event.lNetworkEvents & FD_CLOSE) // 套接字關閉

break; 
}
else if(event.lNetworkEvents & FD_WRITE) // 套接字可寫

if(event.iErrorCode[FD_WRITE_BIT] == 0)
{ }
else
break;
}
return TRUE;
}
while(FALSE);
// 套接字關閉,或者有錯誤發生,程式都會轉到這裡來執行
RemoveSocketObj(pThread, pSocket);
FreeSocketObj(pSocket);
return FALSE;
}


DWORD WINAPI ServerThread(LPVOID lpParam)
{
// 取得本執行緒物件的指標
PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;
while(TRUE)

// 等待網路事件
int nIndex = ::WSAWaitForMultipleEvents(pThread->nSocketCount + 1, pThread->events, FALSE, WSA_INFINITE, FALSE);
nIndex = nIndex - WSA_WAIT_EVENT_0;


// 檢視受信的事件物件
for(int i=nIndex; i<pThread->nSocketCount + 1; i++)
{
nIndex = ::WSAWaitForMultipleEvents(1, &pThread->events[i], TRUE, 1000, FALSE);
if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)

continue; 
}
else

if(i == 0) // events[0]受信,重建陣列

RebuildArray(pThread);
// 如果沒有客戶I/O 要處理了,則本執行緒退出
if(pThread->nSocketCount == 0)

FreeThreadObj(pThread);
return 0;
}
::WSAResetEvent(pThread->events[0]);
}
else // 處理網路事件

// 查詢對應的套接字物件指標,呼叫HandleIO 處理網路事件
PSOCKET_OBJ pSocket = (PSOCKET_OBJ)FindSocketObj(pThread, i);
if(pSocket != NULL)

if(!HandleIO(pThread, pSocket))
RebuildArray(pThread);
}
else
printf(" Unable to find socket object \n ");
}
}
}
}
return 0;
}


void AssignToFreeThread(PSOCKET_OBJ pSocket)

pSocket->pNext = NULL;
::EnterCriticalSection(&g_cs);
PTHREAD_OBJ pThread = g_pThreadList;
// 試圖插入到現存執行緒
while(pThread != NULL)

if(InsertSocketObj(pThread, pSocket)) break;//如果 這個地方 插入成功了會返回true的 否則會返回flase
pThread = pThread->pNext;
}
// 沒有空閒執行緒,為這個套接字建立新的執行緒
if(pThread == NULL)

pThread = GetThreadObj();
InsertSocketObj(pThread, pSocket);
::CreateThread(NULL, 0, ServerThread, pThread, 0, NULL);
}
::LeaveCriticalSection(&g_cs);
// 指示執行緒重建控制代碼陣列
::WSASetEvent(pThread->events[0]);
}


// 從給定執行緒的套接字物件列表中移除一個套接字物件
void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
{
::EnterCriticalSection(&pThread->cs);
// 在套接字物件列表中查詢指定的套接字物件,找到後將之移除
PSOCKET_OBJ pTest = pThread->pSockHeader;
if(pTest == pSocket)

if(pThread->pSockHeader == pThread->pSockTail)
pThread->pSockTail = pThread->pSockHeader = pTest->pNext;
else
pThread->pSockHeader = pTest->pNext;
}
else

while(pTest != NULL && pTest->pNext != pSocket)
pTest = pTest->pNext;
if(pTest != NULL)

if(pThread->pSockTail == pSocket) 
pThread->pSockTail = pTest;
pTest->pNext = pSocket->pNext;
}
}
pThread->nSocketCount --;
::LeaveCriticalSection(&pThread->cs);
::WSASetEvent(pThread->events[0]); // 指示執行緒重建控制代碼陣列
::InterlockedDecrement(&g_nCurrentConnections); // 說明一個連線中斷
}


int main()

USHORT nPort = 4567; // 此伺服器監聽的埠號
// 建立監聽套接字
SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(nPort);
sin.sin_addr.S_un.S_addr = INADDR_ANY;
if(::bind(sListen, (sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR)

printf(" Failed bind() \n");
return -1;
}
::listen(sListen, 200);
// 建立事件物件,並關聯到監聽的套接字
WSAEVENT event = ::WSACreateEvent();
::WSAEventSelect(sListen, event, FD_ACCEPT|FD_CLOSE);
::InitializeCriticalSection(&g_cs);
// 處理客戶連線請求,列印狀態資訊
while(TRUE)

int nRet = ::WaitForSingleObject(event, 5*1000);
if(nRet == WAIT_FAILED)

printf(" Failed WaitForSingleObject() \n");
break;
}
else if(nRet == WSA_WAIT_TIMEOUT) // 定時顯式狀態資訊

printf(" \n");
printf(" TatolConnections: %d \n", g_nTatolConnections);
printf(" CurrentConnections: %d \n", g_nCurrentConnections);
continue;
}
else // 有新的連線未決

::ResetEvent(event);
// 迴圈處理所有未決的連線請求
while(TRUE)

sockaddr_in si;
int nLen = sizeof(si);
SOCKET sNew = ::accept(sListen, (sockaddr*)&si, &nLen);
if(sNew == SOCKET_ERROR)
{
break;
}
PSOCKET_OBJ pSocket = GetSocketObj(sNew);
pSocket->addrRemote = si;


::WSAEventSelect(pSocket->s, pSocket->event, FD_READ|FD_CLOSE|FD_WRITE);


AssignToFreeThread(pSocket);
}
}
}
::DeleteCriticalSection(&g_cs);
return 0;
}