1. 程式人生 > >非同步IO、APC、IO完成埠、執行緒池與高效能伺服器之三 IO完成埠

非同步IO、APC、IO完成埠、執行緒池與高效能伺服器之三 IO完成埠

IO完成埠下面摘抄於MSDNI/O Completion Ports》,smallfool翻譯,原文請參考CSDN文件中心文章《I/O Completion Ports》,。
I/O
完成埠是一種機制,通過這個機制,應用程式在啟動時會首先建立一個執行緒池,然後該應用程式使用執行緒池處理非同步I/O請求。這些執行緒被建立的唯一目的就是 用於處理I/O請求。對於處理大量併發非同步I/O請求的應用程式來說,相比於在I/O請求發生時建立執行緒來說,使用完成埠(s)它就可以做的更快且更有 效率。
CreateIoCompletionPort
函式會使一個I/O完成埠與一個或多個檔案控制代碼發生關聯。當與一個完成埠相關的檔案控制代碼上啟動的非同步I/O

操作完成時,一個I/O完成包就會進入到該完成埠的佇列中。對於多個檔案控制代碼來說,這種機制可以用來把多檔案控制代碼的同步點放在單個物件中。(言下之意,如果我們需要對每個控制代碼檔案進行同步,一般而言我們需要多個物件(如:Event來同步),而我們使用IO Complete Port 來實現非同步操作,我們可以同多個檔案相關聯,每當一個檔案中的非同步操作完成,就會把一個complete package放到佇列中,這樣我們就可以使用這個來完成所有檔案控制代碼的同步)呼叫GetQueuedCompletionStatus函式,某 個執行緒就會等待一個完成包進入到完成埠的佇列中,而不是直接等待非同步I/O
請求完成。執行緒(們)就會阻塞於它們的執行在完成埠(按照後進先出佇列順序 的被釋放)。這就意味著當一個完成包進入到完成埠的佇列中時,系統會釋放最近被阻塞在該完成埠的執行緒。
呼叫GetQueuedCompletionStatus,執行緒就會將會與某個指定的完成埠建立聯絡,一直延續其該執行緒的存在週期,或被指定了不同的完成埠,或者釋放了與完成埠的聯絡。一個執行緒只能與最多不超過一個的完成埠發生聯絡。完 成埠最重要的特性就是併發量。完成埠的併發量可以在建立該完成埠時指定。該併發量限制了與該完成埠相關聯的可執行執行緒的數目。當與該完成埠相關聯的可執行執行緒的總數目達到了該併發量,系統就會阻塞任何與該完成埠相關聯的後續執行緒的執行,直到與該完成埠相關聯的可執行執行緒數目下降到小於該併發 量為止。最有效的假想是發生在有完成包在佇列中等待,而沒有等待被滿足,因為此時完成埠達到了其併發量的極限。此時,一個正在執行中的執行緒呼叫 GetQueuedCompletionStatus
時,它就會立刻從佇列中取走該完成包。這樣就不存在著環境的切換,因為該處於執行中的執行緒就會連續不斷地從佇列中取走完成包,而其他的執行緒就不能運行了。
對於併發量最好的挑選值就是您計算機中CPU的數目。如果您的事務處理需要一個漫長的計算時間,一個比較大的併發量可以允許更多執行緒來執行。雖然完成每個事務處理需要花費更長的時間,但更多的事務可以同時被處理。對於應用程式來說,很容易通過測試併發量來獲得最好的效果。
PostQueuedCompletionStatus
函式允許應用程式可以針對自定義的專用I/O完成包進行排隊,而無需啟動一個非同步I/O操作。這點對於通知外部事件的工作者執行緒來說很有用。在沒有更多的引用針對某個完成埠時,需要釋放該完成埠。該完成埠控制代碼以及與該完成埠相關聯的所有檔案控制代碼都需要被釋放。呼叫CloseHandle可以釋放完成埠的控制代碼。下面的程式碼利用IO完成埠做了一個簡單的執行緒池。

/************************************************************************/
/* Test IOCompletePort.                                                 */
/************************************************************************/

static HANDLE CompleteEvent = NULL;

DWORD UserProc1(LPVOID UserParam)
{
 printf("enter User Proc. /n");

 SetEvent(CompleteEvent);

 return 0;
}

typedef DWORD (*WORK_ITEM_PROC)(LPVOID) ;

DWORD WINAPI IOCPWorkThread(PVOID pParam)
{
 HANDLE CompletePort = (HANDLE)pParam;
 PVOID UserParam;
 WORK_ITEM_PROC UserProc;
 LPOVERLAPPED pOverlapped;

 for(;;)
 {
  BOOL bRet = GetQueuedCompletionStatus(
   CompletePort,
   (LPDWORD)&UserParam,
   (LPDWORD)&UserProc,
   &pOverlapped,
   INFINITE);

  //ASSERT(bRet);

  if(UserProc == NULL) // Quit signal.
   break;

  // execute user's proc.       
  UserProc(UserParam);       
 }

 return 0;
}

void TestIOCompletePort(BOOL bWaitMode, LONG ThreadNum)
{
 HANDLE CompletePort;
 OVERLAPPED Overlapped = { 0, 0, 0, 0, NULL };

 CompletePort = CreateIoCompletionPort(
  INVALID_HANDLE_VALUE,
  NULL,
  NULL,
  0);

 // Create threads.
 for(int i=0; i<ThreadNum; i++)
 {
  HANDLE hThread = CreateThread(NULL,
   0,
   IOCPWorkThread,
   CompletePort,
   0,
   NULL);

  CloseHandle(hThread);
 }


  CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);

 for(int i=0; i<20; i++)
 {
  PostQueuedCompletionStatus(
   CompletePort,
   (DWORD)bWaitMode,
   (DWORD)UserProc1,
   &Overlapped);
 }

 WaitForSingleObject(CompleteEvent, INFINITE);
 CloseHandle(CompleteEvent);


 // Destroy all threads.
 for(int i=0; i<ThreadNum; i++)
 {
  PostQueuedCompletionStatus(
   CompletePort,
   NULL,
   NULL,
   &Overlapped);
 }

 Sleep(10); // wait all thread exit.

 CloseHandle(CompletePort);

}