1. 程式人生 > >Socket編程模型之完畢port模型

Socket編程模型之完畢port模型

value result received 在那 void 系統性能 dcom 查詢 tails

轉載請註明來源:

viewmode=contents">http://blog.csdn.net/caoshiying?viewmode=contents

一、回想重疊IO模型


用完畢例程來實現重疊I/O比用事件通知簡單得多。在這個模型中,主線程僅僅用不停的接受連接就可以;輔助線程推斷有沒有新的client連接被建立,假設有。就為那個client套接字激活一個異步的WSARecv操作,然後調用SleepEx使線程處於一種可警告的等待狀態,以使得I/O完畢後CompletionROUTINE能夠被內核調用。假設輔助線程不調用SleepEx。則內核在完畢一次I/O操作後,無法調用完畢例程(由於完畢例程的執行應該和當初激活WSARecv異步操作的代碼在同一個線程之內)。

完畢例程內的實現代碼比較簡單,它取出接收到的數據,然後將數據原封不動的發送給client。最後又一次激活還有一個WSARecv異步操作。註意,在這裏用到了“跟隨數據”。我們在調用WSARecv的時候,參數lpOverlapped實際上指向一個比它大得多的結構PER_IO_OPERATION_DATA,這個結構除了WSAOVERLAPPED以外。還被我們附加了緩沖區的結構信息,另外還包括client套接字等重要的信息。這樣。在完畢例程中通過參數lpOverlapped拿到的不不過WSAOVERLAPPED結構,還有後邊跟隨的包括client套接字和接收數據緩沖區等重要信息。這種C語言技巧在我介紹完畢port的時候還會使用到。

二、完畢port模型


“完畢port”模型是迄今為止最為復雜的一種I/O模型。

然而,假若一個應用程序同一時候須要管理為數眾多的套接字,那麽採用這樣的模型,往往能夠達到最佳的系統性能!

但不幸的是。該模型僅僅適用於Windows NT和Windows 2000操作系統。

因其設計的復雜性,僅僅有在你的應用程序須要同一時候管理數百乃至上千個套接字的時候,並且希望隨著系統內安裝的CPU數量的增多,應用程序的性能也能夠線性提升。才應考慮採用“完畢port”模型。

要記住的一個基本準則是。假如要為Windows NT或Windows 2000開發高性能的server應用。同一時候希望為大量套接字I/O請求提供服務(Webserver便是這方面的典型樣例)。那麽I/O完畢port模型便是最佳選擇!



完畢port模型是我最喜愛的一種模型。盡管事實上現比較復雜(事實上我認為它的實現比用事件通知實現的重疊I/O簡單多了)。但其效率是驚人的。

我在T公司的時候以前幫同事寫過一個郵件server的性能測試程序,用的就是完畢port模型。

結果表明。完畢port模型在多連接(成千上萬)的情況下。只依靠一兩個輔助線程。就能夠達到很高的吞吐量。



三、關鍵函數


1、CreateIoCompletionPort


創建一個輸入/輸出(I / O)完畢port,並將其與一個指定的文件句柄關聯。或者創建一個尚未與文件句柄關聯的I / O完畢port,同意在稍後的時間關聯。將已打開的文件句柄的實例與一個I / O完畢port關聯,同意一個進程接收包括該文件句柄的異步I / O操作完畢的通知。註意:這裏所使用的術語文件句柄是指代表一個重疊的I / O端點的系統抽象,而不不過磁盤上的一個文件。不論什麽系統對象支持重疊I / o-such網絡端點,TCP套接字。命名管道、郵件槽能夠作為文件句柄。



函數原型:

HANDLE WINAPI CreateIoCompletionPort(
  _In_     HANDLE    FileHandle,
  _In_opt_ HANDLE    ExistingCompletionPort,
  _In_     ULONG_PTR CompletionKey,
  _In_     DWORD     NumberOfConcurrentThreads
);


函數參數:

FileHandle:一個打開的文件句柄或者INVALID_HANDLE_VALUE。這個文件句柄必須是支持重疊IO的object。

假設提供了句柄, 它必須是已經給重疊I/O模型完畢port打開的句柄。比如。假設您使用CreateFile函數獲取的句柄,那麽您在調用這個函數時必須在參數中指定FILE_FLAG_OVERLAPPED旗標。假設指定 INVALID_HANDLE_VALUE,那麽函數將創建一個沒有關聯文件句柄的IO完畢port模型,此外ExistingCompletionPort參數必須設為NULL,CompletionKey參數將被忽略。

ExistingCompletionPort:是已經存在的完畢port。

假設為NULL。則為新建一個IOCP。

CompletionKey:用戶定義的句柄包括的I/O完畢包信息。當FileHandle被設為INVALID_HANDLE_VALUE時此參數被忽略。



NumberOfConcurrentThreads:操作系統能夠同意同一時候處理I / O完畢端口的I / O完畢數據包的線程的最大數目。假設existingcompletionport參數不為空,則忽略此參數。假設這個參數為零,系統同意多個並發執行的線程。由於系統中有處理器。

返回值:

假設函數成功,返回值是一個I / O完畢port的句柄:假設ExistingCompletionPort參數為空。返回值是一個新的處理。假設ExistingCompletionPort參數是一個有效的I/O完畢port句柄,返回值是同樣的處理。假設文件句柄參數是一個有效的處理,文件處理是如今與返回的I/O完畢port。假設函數失敗,返回值為空。為了獲得很多其它的錯誤信息,調用GetLastError函數。


2、GetQueuedCompletionStatus



失望的是微軟官方MSDN沒有提供關於這個API的說明。下面參照一篇英文文檔進行翻譯。

文檔說這個函數試圖將一個I/O完畢包從指定的I/O完畢port。假設沒有完畢數據包隊列,則函數等待一個掛起的I / O操作與完畢port相關聯的完畢。


函數原型:

BOOL WINAPI GetQueuedCompletionStatus(
  _In_  HANDLE       CompletionPort,
  _Out_ LPDWORD      lpNumberOfBytes,
  _Out_ PULONG_PTR   lpCompletionKey,
  _Out_ LPOVERLAPPED *lpOverlapped,
  _In_  DWORD        dwMilliseconds
);


函數參數:

CompletionPort:完畢port的句柄。創建一個完畢port。使用CreateIoCompletionPort函數。

lpNumberOfBytes:指向已完畢的I / O操作期間傳輸的字節數的變量的指針。

lpCompletionKey:指向與文件句柄關聯的完畢鍵的變量的指針,該鍵的I / O操作已完畢。一個完畢的關鍵是每個文件的關鍵,是指定一個叫CreateIoCompletionPort。

lpOverlapped:一個指向一個變量的指針,該指針指向在已完畢的I / O操作開始時指定的重疊結構的地址的變量。

即使您已經通過了一個與完畢port相關聯的文件句柄和一個有效的重疊結構,應用程序也能夠防止完畢port通知。這是通過指定的重疊結構的hevent成員有效的事件處理完畢,並設置其低階位。

一個有效的事件句柄,其低階位設置將保持I / O完畢從被隊列到完畢port。



dwMilliseconds:調用方願意等待完畢數據包出如今完畢port的毫秒數。

假設一個完畢包沒有出如今指定的時間內,功能倍出。返回false。並設置*lpOverlapped為null。

假設該參數是無限的。函數將沒有時間了。

假設該參數為零,沒有I/O操作中出列,函數將取消等待時間,馬上操作。

返回值:

返回非零(真)。假設成功或零(假),否則。為了獲得很多其它的錯誤信息,調用GetLastError。

此功能將一個線程與指定的完畢port關聯。

一個線程能夠與至多一個完畢port相關聯的。假設由於完畢port句柄與它是封閉而調用調用GetQueuedCompletionStatus突出失敗。函數返回false。*lpOverlapped會是空的,GetLastError將返回error_abandoned_wait_0。

Windows Server 2003和Windows XP:關閉完畢port句柄,調用優秀不會導致之前的行為。該函數將繼續等待直到一項是從港口或直到發生超時刪除,假設指定以外的無限價值。

假設GetQueuedCompletionStatus函數調用成功,它出列完畢包一個成功的I/O操作完畢port和存儲信息的變量所指向的下列參數:lpNumberOfBytes。lpcompletionkey,和lpOverlapped。在失敗(返回值是錯誤的),這些同樣的參數能夠包括特定的值組合例如以下:
假設*lpOverlapped為空。功能沒有出列完畢包從完畢port。在這樣的情況下,函數不存儲信息在lpNumberOfBytes and lpCompletionKey所指向的參數中,其值是不確定的。


假設*lpOverlapped不空和功能按一個失敗的I/O操作的完畢port完畢包的功能。存儲信息有關失敗操作的變量所指向的lpcompletionkey lpOverlapped lpNumberOfBytes。為了獲得很多其它的錯誤信息。調用GetLastError。


3、PostQueuedCompletionStatus

將一個I / O完畢數據包發送到一個I / O完畢port。I/O完畢包將滿足一個優秀的調用GetQueuedCompletionStatus函數。

該函數返回三值傳遞的第二,第三,和第四個參數postqueuedcompletionstatus呼叫。

該系統不使用或驗證這些值。特別是。lpOverlapped參數不須要點的重疊結構。

函數原型:

BOOL WINAPI PostQueuedCompletionStatus(
  _In_     HANDLE       CompletionPort,
  _In_     DWORD        dwNumberOfBytesTransferred,
  _In_     ULONG_PTR    dwCompletionKey,
  _In_opt_ LPOVERLAPPED lpOverlapped
);

參數:

CompletionPort:一個I / O完畢數據包的I / O完畢port的句柄。



dwNumberOfBytesTransferred:要通過lpnumberofbytestransferred參數GetQueuedCompletionStatus函數返回的值。0xFFFFFFFF表示處理全部跟隨數據。僅僅有準備關閉port的時候才這樣做。



dwCompletionKey:能夠通過GetQueuedCompletionStatus函數返回的值lpcompletionkey參數。

lpOverlapped:要通過lpOverlapped參數GetQueuedCompletionStatus函數返回的值。

返回值:

假設函數成功。返回值是非零的。假設函數失敗,返回值為零。為了獲得很多其它的錯誤信息,調用GetLastError。



四、完整的演示樣例程序


接著上面幾篇Socket文章寫,關於公共代碼與反射式client請參見:《Socket編程模型之簡單選擇模型》。以下是新建的overlapped_serverproject,新建了一個overlapped_server_manager類型,繼承自iserver_manager接口,頭文件完整代碼例如以下:

#pragma once

#define SOCKET_MESSAGE_SIZE 1024

#include <WinSock2.h>
#include <common_callback.h>

typedef enum
{
	RECV_POSTED
}OPERATION_TYPE;

typedef struct
{
	WSAOVERLAPPED overlap;
	WSABUF buffer;
	char message[SOCKET_MESSAGE_SIZE];
	DWORD received_count;
	DWORD flags;
	OPERATION_TYPE operation_type;
}PEERIO_OPERATION_DATA, *LPPEERIO_OPERATION_DATA;

class completeio_server_manager:
	public iserver_manager
{
private:
	int iport;
	int iaddr_size;
	common_callback callback;
	BOOL brunning;
	SOCKET server;
	WSADATA wsaData;
	HANDLE hcomplete_port;
	SYSTEM_INFO system_info;
	LPPEERIO_OPERATION_DATA peer_data;
	bool bdisposed;

protected:
	bool accept_by_crt();
	bool accept_by_winapi();

public:
	void receive();
	void shutdown();
	void start_receive();
	void start_accept();

public:
	completeio_server_manager();
	virtual ~completeio_server_manager();
};


實現文件完整代碼例如以下:

#include "completeio_server_manager.h"
#include <stdio.h>
#include <tchar.h>

completeio_server_manager::completeio_server_manager()
{
	iport = 5150;
	iaddr_size = sizeof(SOCKADDR_IN);
	brunning = FALSE;
	GetSystemInfo(&system_info);
	callback.set_manager(this);
	callback.set_receive_thread_coount(system_info.dwNumberOfProcessors);
	hcomplete_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
	bdisposed = false;
}


completeio_server_manager::~completeio_server_manager()
{
	if (bdisposed)
		shutdown();
}

bool completeio_server_manager::accept_by_crt()
{

	return true;
}

bool completeio_server_manager::accept_by_winapi()
{
	SOCKADDR_IN server_addr;
	SOCKADDR_IN client_addr;
	SOCKET client;
	LPPEERIO_OPERATION_DATA peer_data;
	int iresult = -1;

	WSAStartup(MAKEWORD(2, 2), &wsaData);
	server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	server_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
	server_addr.sin_family = AF_INET;
	server_addr.sin_port = htons(iport);
	do
	{
		iresult = bind(server, (struct sockaddr*)&server_addr, iaddr_size);
		if (iresult == SOCKET_ERROR)
		{
			iport++;
			server_addr.sin_port = htons(iport);
		}
	} while (iresult == -1);
	listen(server, 3);
	printf("基於完畢端口模型的Socket服務器啟動成功。監聽端口是:%d\n", iport);
	while (brunning)
	{
		printf("開始監聽請求。\n");
		client = accept(server, (struct sockaddr*)&client_addr, &iaddr_size);
		if (client == SOCKET_ERROR)
			continue;
		printf("新客戶端連接:%s:%d\n", inet_ntoa(client_addr.sin_addr), htons(client_addr.sin_port));
		CreateIoCompletionPort((HANDLE)client, hcomplete_port, (DWORD)client, 0);
		peer_data = (LPPEERIO_OPERATION_DATA)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PEERIO_OPERATION_DATA));
		peer_data->buffer.len = SOCKET_MESSAGE_SIZE;
		peer_data->buffer.buf = peer_data->message;
		peer_data->operation_type = RECV_POSTED;
		printf("開始接收客戶端傳送數據。\n");
		WSARecv(client, &peer_data->buffer, 1, &peer_data->received_count, &peer_data->flags, &peer_data->overlap, NULL);
		printf("收到客戶端數據。\n");
	}
	return true;
}

void completeio_server_manager::receive()
{
	DWORD dwtransfered = 0;
	SOCKET client;
	LPPEERIO_OPERATION_DATA peer = nullptr;

	while (brunning)
	{
		printf("線程:%d,查詢端口狀態信息。

\n",GetCurrentThreadId()); GetQueuedCompletionStatus(hcomplete_port, &dwtransfered, (PULONG_PTR)&client, (LPOVERLAPPED*)&peer, INFINITE); printf("獲得端口信息。\n"); if (dwtransfered == 0xFFFFFFFF) return; if (peer->operation_type == RECV_POSTED) { if (dwtransfered == 0) { closesocket(client); printf("有客戶端退出了。\n"); HeapFree(GetProcessHeap(), 0, peer); } else { peer->message[dwtransfered] = 0; send(client, peer->message, dwtransfered, 0); memset(peer, 0, sizeof(PEERIO_OPERATION_DATA)); peer->buffer.len = SOCKET_MESSAGE_SIZE; peer->buffer.buf = peer->message; peer->operation_type = RECV_POSTED; WSARecv(client, &peer->buffer, 1, &peer->received_count, &peer->flags, &peer->overlap, nullptr); } } } } void completeio_server_manager::shutdown() { PostQueuedCompletionStatus(hcomplete_port, 0xFFFFFFFF, 0, NULL);//端口跟隨數據。

brunning = FALSE; callback.shutdown();//清掃 CloseHandle(hcomplete_port); closesocket(server); WSACleanup(); bdisposed = true; } void completeio_server_manager::start_accept() { brunning = TRUE; bdisposed = false; callback.start_accept_by_winapi(); } void completeio_server_manager::start_receive() { brunning = TRUE; bdisposed = false; callback.start_receive(); } int main() { completeio_server_manager csm; csm.start_accept(); csm.start_receive(); printf("服務器啟動成功。按隨意鍵關閉服務器並退出程序。\n"); getchar(); csm.shutdown(); return 0; }

五、效果


技術分享圖片

六、心得體會


成功創建一個完畢port後,便可開始將套接字句柄與對象關聯到一起。但在關聯套接字之前,首先必須創建一個或多個“工作者線程”,以便在I/O請求投遞給完畢port對象後,為完畢port提供服務。在這個時候,大家也許會認為奇怪,究竟應創建多少個線程,以便為完畢port提供服務呢?這實際正是完畢port模型顯得頗為“復雜”的一個方面,由於服務I/O請求所需的數量取決於應用程序的整體設計情況。



在此要記住的一個重點在於。在我們調用CreateIoCompletionPort時指定的並發線程數量,與打算創建的工作者線程數量相比,它們代表的並不是同一件事情。早些時候。我們曾建議大家用CreateIoCompletionPort函數為每一個處理器都指定一個線程(處理器的數量有多少,便指定多少線程)以避免因為頻繁的線程“場景”交換活動,從而影響系統的總體性能。CreateIoCompletionPort函數的NumberOfConcurrentThreads參數明白指示系統:在一個完畢port上,一次僅僅同意n個工作者線程執行。假如在完畢port上創建的工作者線程數量超出n個。那麽在同一時刻。最多僅僅同意n個線程執行。

但實際上,在一段較短的時間內,系統有可能超過這個值,但非常快便會把它降低至事先在CreateIoCompletionPort函數中設定的值。那麽。為何實際創建的工作者線程數量有時要比CreateIoCompletionPort函數設定的多一些呢?這樣做有必要嗎?如先前所述。這主要取決於應用程序的整體設計情況。

假定我們的某個工作者線程調用了一個函數,比方Sleep或WaitForSingleObject,但卻進入了暫停(鎖定或掛起)狀態。那麽同意還有一個線程取代它的位置。換言之,我們希望隨時都能運行盡可能多的線程。當然,最大的線程數量是事先在CreateIoCompletionPort調用裏設定好的。



這樣一來。假如事先估計到自己的線程有可能臨時處於停頓狀態,那麽最好可以創建比CreateIoCompletionPort的NumberOfConcurrentThreads參數的值多的線程。以便到時候充分發揮系統的潛力。

一旦在完畢port上擁有足夠多的工作者線程來為I/O請求提供服務,便可著手將套接字句柄同完畢port關聯到一起。這要求我們在一個現有的完畢port上,調用CreateIoCompletionPort函數,同一時候為前三個參數——FileHandle,ExistingCompletionPort和CompletionKey——提供套接字的信息。當中, FileHandle參數指定一個要同完畢port關聯在一起的套接字句柄。

ExistingCompletionPort參數指定的是一個現有的完畢port。

CompletionKey(完畢鍵)參數則指定要與某個特定套接字句柄關聯在一起的“單句柄數據”。在這個參數中。應用程序可保存與一個套接字相應的隨意類型的信息。之所以把它叫作“單句柄數據”。是因為它僅僅相應著與那個套接字句柄關聯在一起的數據。

可將其作為指向一個數據結構的指針,來保存套接字句柄;在那個結構中。同一時候包括了套接字的句柄。以及與那個套接字有關的其它信息。

Socket編程模型之完畢port模型