1. 程式人生 > >在c#多執行緒使用IOCP(完成埠)的簡單示例

在c#多執行緒使用IOCP(完成埠)的簡單示例

在c#使用IOCP(完成埠)的簡單示例
上次給大家發了利用winsock原生的api來做一個同步的socket伺服器的例子,大致上只是貼了一些程式碼,相信大家這麼冰雪聰明,已經研究的差不多了。因為winsock的api使用在msdn或者google上都能很方便的查到,所以我沒太多羅嗦程式碼的原理。但是c#進行平臺呼叫方面是有一些經驗的,單靠google和msdn及社群的力量有時候不容易得到答案。這次給大家演示一下利用IOCP的線上程間傳遞資料的例子,順便打算講一些細節和注意的地方。

概述:這裡主要使用IOCP的三個 API,CreateIoCompletionPort,PostQueuedCompletionStatus,GetQueuedCompletionStatus,第一個是用來建立一個完成埠物件,第二個是向一個埠傳送資料,第三個是接受資料,基本上用著三個函式,就可以寫一個使用IOCP的簡單示例。

其中完成埠一個核心物件,所以建立的時候會耗費效能,CPU得切換到核心模式,而且一旦建立了核心物件,我們都要記著要不用的時候顯式的釋放它的控制代碼,釋放非託管資源的最佳實踐肯定是使用 Dispose模式,這個部落格園有人講過N次了。而一般要獲取一個核心物件的引用,最好用SafeHandle來引用它,這個類可以幫你管理引用計數,而且用它引用核心物件,程式碼更健壯,如果用指標引用核心物件,在建立成功核心物件並複製給指標這個時間段,如果拋了 ThreadAbortException,這個核心物件就洩漏了,而用SafeHandle去應用核心物件就不會在賦值的時候發生 ThreadAbortException。另外SafeHandle類繼承自CriticalFinalizerObject類,並實現了 IDispose介面,CLR對CriticalFinalizerObject及其子類有特殊照顧,比如說在編譯的時候優先編譯,在呼叫非 CriticalFinalizerObject類的Finalize方法後再呼叫CriticalFinalizerObject類的Finalize 類的Finalize方法等。在win32裡,一般一個控制代碼是-1或者0的時候表示這個控制代碼是無效的,所以.net有一個SafeHandle的派生類 SafeHandleZeroOrMinusOneIsInvalid ,但是這個類是一個抽象類,你要引用自己使用的核心物件或者非託管物件,要從這個類派生一個類並重寫Relseas方法。另外在.net框架裡它有兩個實現幾乎一模一樣的子類,一個是SafeFileHandle一個是SafeWaitHandle,前者表示檔案控制代碼,後者表示等待控制代碼,我們這裡為了方便就直接用SafeFileHandle來引用完成埠物件了。

CreateIoCompletionPort函式的原型如下
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);

FileHandle引數表示要繫結在完成埠上的控制代碼,比如說一個已經accept的socket控制代碼。

ExistingCompletionPort 引數表示一個已有的完成埠控制代碼,第一次建立完成埠的時候顯然隨便傳個值就行,所以這個引數直接定義成IntPtr型別了。當你建立了工作執行緒來為 I/O請求服務的時候,才會把控制代碼和完成埠關聯在一起,而之前第一次建立完成埠的時候這個引數傳一個zero指標就O了,而FileHandle引數傳一個-1的指標就行了。

CompletionKey是完成鍵的意思,它可以是任意想傳遞給工作執行緒的資料,學名叫做單控制代碼資料,就是說跟隨FileHandle引數走的一些狀態資料,一般在socket的iocp 程式裡是把socket傳進去,以便在工作執行緒裡拿到這個socket控制代碼,在收到非同步操作完成的通知及處理後繼續進行下一個非同步操作的投遞,如傳送和接受資料等。

NumberOfConcurrentThreads 引數表示在一個完成埠上同時允許執行的最大執行緒數量。如果傳0,就是說你有幾個CPU,就是允許最大有幾個執行緒,這也是最理想情況,因為一個CPU一個執行緒可以防止執行緒上下文切換。關於這個值要和建立工作執行緒的數量的關係,大家要理解清楚,不一定CPU有多少個,你的工作執行緒就建立多少個。因為你的工作執行緒有時候會阻塞或者等待,而如果你正好建立了CPU個數個工作執行緒,有一個等待的話,因為你分配了同時最多有CPU個數多個最大IOCP執行緒,這時候就不能效率最大化了。所以一般工作執行緒建立的要比CPU個數多一些,除非你保證你的工作執行緒不會阻塞。

PostQueuedCompletionStatus函式原型如下
 [DllImport("Kernel32", CharSet = CharSet.Auto)]
    private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
該方法用於給完成埠投遞自定義資訊,一般情況下如果把某個控制代碼和完成埠繫結後,當有資料收發操作完成時會自動同時工作執行緒,工作執行緒裡的 GetQueuedCompletionStatus就不會阻塞,而繼續往下走,來進行接收到IO操作完成通知的流程。而有時候我們需要手工向工作者執行緒投遞一些訊息,比如說我們主執行緒知道所有的socket控制代碼都關閉了,工作執行緒可以退出了,我們就可以給工作執行緒發一個自定義資料,工作執行緒收到後判斷是否是退出指令,然後退出。

CompletionPort引數表示向哪個完成埠物件投遞資訊,在這個完成埠上等待訊息的工作執行緒就會收到訊息了。
dwNumberOfBytesTransferred表示你投遞的資料有多大,我們一般投遞的是一個物件的指標,在32位系統裡,int指標就是4個位元組了,直接寫4就O了,要不就用sizeof你傳的資料,如sizeof(IntPtr)。

dwCompletionKey同CreateIoCompletionPort的解釋,是單控制代碼資料,本示例用不到,不細說,直接用IntPtr.Zero填充了事。

lpOverlapped引數,本意是一個win32的overlapped結構的指標,本示例中不用,所以不詳細講。它叫單IO資料,是相對單據並拘束CompletionKey來講的,前者是一個控制代碼的每次IO操作的上下文,比如單詞IO操作的資料、操作型別等,後者是整個控制代碼的上下文。但這裡我們表示你要投遞的資料,可以是任何型別的資料(誰讓它是個指標呢,所以傳啥都行),值得注意的一點就是,這個資料傳遞到工作執行緒的時候,中間這個資料走的是非託管程式碼。所以不能直接傳一個引用進去,這裡要使用到GCHandle類。先大致介紹一下這個類吧。它有個靜態方法Alloc來給把一個物件在GC控制代碼表裡註冊,GC控制代碼表示CLR為沒個應用程式域提供的一個表,它允許你來監視和管理物件的生命週期,你可以往裡加一個物件的引用,也可以從裡面移除一個物件,往裡加物件的時候,還可以指定一個標記來表示我們希望如何監視和控制這個物件。而加入一個條目的操作就是GCHandle的Alloc物件,它有兩個引數,第一個引數是物件,第二引數是 GCHandleType型別的列舉,第二個引數表示我們如何來監視和控制這個物件的生命週期。當這個引數是GCHandleType.Normal時,表示我們告訴垃圾收集器,及時託管程式碼裡沒有該物件的根,也不要回收該物件,但垃圾收集器可以移動它,一般我們向非託管程式碼傳遞一個物件,而又從非託管程式碼傳遞回來的時候用這個型別非常好,它不會讓垃圾收集器在非託管程式碼返回託管程式碼的時候回收掉該物件,還不怎麼影響GC的效能,因為GC還可以移動它。 dwCompletionKey就是我們在託管-非託管-託管之間傳遞的一個很典型的場景。所以這裡用它,另外還有 GCHandleType.Pinned,它和GCHandleType.Normal不同的一點就是GC除了在沒有根的時候不能回收這個物件外,還不能移動它,應用場景是給非託管程式碼傳遞一個byte[]的buffer,讓託管程式碼去填充,如果用GCHandleType.Normal有可能在非託管程式碼返回託管程式碼的時候寫錯記憶體位置,因為有可能GC移動了這個物件的記憶體地址。關於根、GC原理,大家可以參考相關資料。另外在你的資料從非託管程式碼傳遞會託管程式碼後,要呼叫GCHandle的例項方法free來在GC控制代碼表裡移除該物件,這時候你的託管程式碼還有個該物件的引用,也就是根,GC也不會給你回收的,當你用完了後,GC就給你回收了。GCHandle的Target屬性用來訪問GCHandle指向的物件。其它兩個GCHandleType的成員是關於弱引用的,和本文關係不大,就不介紹了。

GetQueuedCompletionStatus原型如下
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
  public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
      out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
      out IntPtr lpOverlapped, uint dwMilliseconds);
前幾個引數和PostQueuedCompletionStatus差不多,
CompletionPort表示在哪個完成埠上等待PostQueuedCompletionStatus發來的訊息,或者IO操作完成的通知,

lpNumberOfBytesTransferred 表示收到資料的大小,這個大小不是說CompletionKey的大小,而是在單次I/O操作完成後(WSASend或者WSAReceve),實際傳輸的位元組數,我在這裡理解的不是很透徹,我覺得如果是接受PostQueuedCompletionStatus的訊息的話,應該是收到 lpOverlapped的大小,因為它才是單IO資料嘛。

lpCompletionKey用來接收單據並資料,我們沒傳遞啥,後來也沒用,在socket程式裡,一般接socket控制代碼。

lpOverlapped用來接收單IO資料,或者我們的自定義訊息。

dwMilliseconds表示等待一個自定義訊息或者IO完成通知訊息在完成埠上出現的時間,傳遞INIFINITE(0xffffffff)表示無限等待下去。

好了,API大概介紹這麼多,下面介紹程式碼
1、主執行緒建立一個完成埠物件,不和任何控制代碼繫結,前幾個引數都寫0,NumberOfConcurrentThreads引數我們寫1,因為我們的示例就一個工作執行緒。
2、建立一個工作執行緒,把第一步建立的完成埠傳進去
3、建立兩個單IO資料,分別發投遞給第一步建立的完成埠
4、在工作執行緒裡執行一個死迴圈,迴圈在傳遞進來的完成埠上等待訊息,沒有訊息的時候GetQueuedCompletionStatus處於休息狀態,有訊息來的時候把指標轉換成物件,然後輸出
5、如果收到退出指令,就退出迴圈,從而結束工作者執行緒。
下面是完整程式碼,需要開啟不安全程式碼的編譯選項。

using System;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;

[StructLayout(LayoutKind.Sequential)]
class PER_IO_DATA
{
    public string Data;
}

public class IOCPApiTest
{
    [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
    public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
    [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
    public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
        out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
        out IntPtr lpOverlapped, uint dwMilliseconds);
    [DllImport("Kernel32", CharSet = CharSet.Auto)]
    private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);

    public static unsafe void TestIOCPApi()
    {
        var CompletionPort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 1);
        if(CompletionPort.IsInvalid)
        {
            Console.WriteLine("CreateIoCompletionPort 出錯:{0}",Marshal.GetLastWin32Error());
        }
        var thread = new Thread(ThreadProc);
        thread.Start(CompletionPort);

        var PerIOData = new PER_IO_DATA() ;
        var gch = GCHandle.Alloc(PerIOData);
        PerIOData.Data = "hi,我是蛙蛙王子,你是誰?";
        Console.WriteLine("{0}-主執行緒傳送資料",Thread.CurrentThread.GetHashCode());
        PostQueuedCompletionStatus(CompletionPort, (uint)sizeof(IntPtr), IntPtr.Zero, (IntPtr)gch);

        var PerIOData2 = new PER_IO_DATA();
        var gch2 = GCHandle.Alloc(PerIOData2);
        PerIOData2.Data = "關閉工作執行緒吧";
        Console.WriteLine("{0}-主執行緒傳送資料", Thread.CurrentThread.GetHashCode());
        PostQueuedCompletionStatus(CompletionPort, 4, IntPtr.Zero, (IntPtr)gch2);
        Console.WriteLine("主執行緒執行完畢");
        Console.ReadKey();
    }
    static void ThreadProc(object CompletionPortID)
    {
        var CompletionPort = (SafeFileHandle)CompletionPortID;

        while (true)
        {
            uint BytesTransferred;
            IntPtr PerHandleData;
            IntPtr lpOverlapped;
            Console.WriteLine("{0}-工作執行緒準備接受資料",Thread.CurrentThread.GetHashCode());
            GetQueuedCompletionStatus(CompletionPort, out BytesTransferred,
                                      out PerHandleData, out lpOverlapped, 0xffffffff);
            if(BytesTransferred <= 0)
                continue;
            GCHandle gch = GCHandle.FromIntPtr(lpOverlapped);
            var per_HANDLE_DATA = (PER_IO_DATA)gch.Target;
            Console.WriteLine("{0}-工作執行緒收到資料:{1}", Thread.CurrentThread.GetHashCode(), per_HANDLE_DATA.Data);
            gch.Free();
            if (per_HANDLE_DATA.Data != "關閉工作執行緒吧") continue;
            Console.WriteLine("收到退出指令,正在退出");
            CompletionPort.Dispose();
            break;
        }
    }

    public static int Main(String[] args)
    {
        TestIOCPApi();
        return 0;
    }
}
posted on 2008-07-12 19:46 蛙蛙池塘 閱讀(2678) 評論(19)  編輯 收藏 網摘


評論:
#1樓 [樓主]  蛙蛙池塘       Posted @ 2008-07-12 19:53
好想把文字的灰底兒去了,可是不會去,我複製上來就成這了,汗。   回覆  引用  檢視   

#3樓   henry       Posted @ 2008-07-12 20:30
c#的socket非同步不是已經具備IOCP功能了嗎?   回覆  引用  檢視   

#4樓 [樓主]  蛙蛙池塘       Posted @ 2008-07-12 21:02
@henry
非同步socket是封裝了iocp,呵呵。.net 2.0的有些問題,3.0的非同步提高了不少據說,不過本文主要是介紹底層原理及讓有興趣的朋友自己重新實現IOCP。
另外看你的部落格對emit和反射研很瞭解。   回覆  引用  檢視   

#5樓   airwolf2026       Posted @ 2008-07-12 22:21
c#的非同步socket真的封裝了IOCP了?俺對c#網路程式設計瞭解比較少.那是不是說
c#裡面的一些非同步操作.如果系統有提供IOCP功能的話,是不是也封裝了?感覺自己問的這個問題好傻?   回覆  引用  檢視   

#6樓 [樓主]  蛙蛙池塘       Posted @ 2008-07-12 22:26
@airwolf2026
socket的非同步操作是用的IOCP實現的,一般來說IO密集型的非同步操作.NET都是用IOCP實現的,計算密集型的非同步操作都是用ThreadPool實現的,當然只是一般來說。   回覆  引用  檢視   

#7樓   airwolf2026       Posted @ 2008-07-12 22:28
呵呵.多謝樓主.   回覆  引用  檢視   

#8樓 [樓主]  蛙蛙池塘       Posted @ 2008-07-12 22:43
@airwolf2026
:)   回覆  引用  檢視   

#9樓   黎叔 [未註冊使用者] Posted @ 2008-07-13 00:10
var只能讓程式碼更難讀懂。

有必要全篇使用麼   回覆  引用   

#10樓   黎叔 [未註冊使用者] Posted @ 2008-07-13 00:14
不過感謝樓主的好文章   回覆  引用   

#12樓 [樓主]  蛙蛙池塘       Posted @ 2008-07-13 23:11
哥們折騰一天IOCP也沒寫出個例子來,汗了。
Win32的WSAReceive函式,如下
WSARecv([In] SafeSocketHandle socketHandle, [In, Out] ref WSABuffer buffer, [In] int bufferCount, out int bytesTransferred, [In, Out] ref SocketFlags socketFlags, [In] IntPtr overlapped, [In] IntPtr completionRoutine);

倒數第二個引數只能傳遞overlapped物件指標,我想傳遞一個自定義物件,不知道咋辦。
目前只能用以下方式呼叫
GCHandle gcHandle = GCHandle.Alloc(PerIoData.Overlapped, GCHandleType.Pinned);
WSARecv(Accept, ref PerIoData.DataBuf ,1, out RecvBytes, ref Flags, gcHandle.AddrOfPinnedObject(), IntPtr.Zero);
可是,我想把整個PerIoData傳進去,而不是光一個overlapped物件,PerIoData型別如下。
[StructLayout(LayoutKind.Sequential)]
class PER_IO_OPERATION_DATA
{
public unsafe NativeOverlapped Overlapped;
public WSABuffer DataBuf;
public readonly byte[] Buffer = new byte[DATA_BUFSIZE];
public uint BytesSEND;
public uint BytesRECV;
}

在c++裡很簡單,只要傳遞PerIoData物件的首地址,其實也就是型別的第一個成員Overlapped的地址進去就行了,然後就可以在工作執行緒裡用GetQueuedCompletionStatus取到了,程式碼大約如下
執行非同步IO操作
WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,&(PerIoData->Overlapped), NULL);
接受非同步IO完成通知
GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE)
灰常的簡單,直接就取出了PerHandleData資料。

c#就不知道咋搞了,我用Marshal.StructureToPtr,Marshal.AllocHGlobal,Marshal.Copy折騰半天也沒弄成。而直接用以下程式碼
GCHandle gcHandle2 = GCHandle.Alloc(PerIoData);
PerIoData.Overlapped = new NativeOverlapped();
WSARecv(Accept, ref PerIoData.DataBuf ,1, out RecvBytes, ref Flags, GCHandle.ToIntPtr(gcHandle2), IntPtr.Zero)
會說無效控制代碼的。
我用WSARecv(Accept, ref PerIoData.DataBuf ,1, out RecvBytes, ref Flags, new IntPtr(GCHandle.ToIntPtr(gcHandle2).ToInt32()+8), IntPtr.Zero)倒是可以,然後再工作執行緒裡用以下語句接受自定義物件
gcHandle_per_io_data = GCHandle.FromIntPtr(new IntPtr(intptr_per_io_data.ToInt32()-8));
PER_IO_OPERATION_DATA PerIoData = (PER_IO_OPERATION_DATA)gcHandle_per_io_data.Target;
確實是可以,但是當第二個請求進來之後,GCHandle.ToIntPtr(gcHandle2).ToInt32()+8就不是overlapped的地址了,我也不知道為啥,可能這只是一個巧合。

要怎麼才能在WSARecv裡向工作執行緒傳遞一個自定義資訊呢,我除了傳overlapped外,怎麼也得把非同步操作型別、快取等傳給工作執行緒吧。
  回覆  引用  檢視   

#13樓 [樓主]  蛙蛙池塘       Posted @ 2008-07-13 23:12
除了這個問題,別的基本都解決了。   回覆  引用  檢視   

#14樓   曲濱*銘龘鶽       Posted @ 2008-07-14 00:14
HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle,
__in HANDLE ExistingCompletionPort,
__in ULONG_PTR CompletionKey,
__in DWORD NumberOfConcurrentThreads
);
*********************************
__in ULONG_PTR CompletionKey,
這個引數本來是一個 4 位元組的整數,也就是說可以放入一個win32
指標;
********************************

C++ 程式碼中是在第2處 CreateIoCompletionPort 呼叫處
把 PerHandleData 傳遞進去的的並不是
WSARecv

if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData,
0) == NULL)
{
printf("CreateIoCompletionPort failed with error %d/n", GetLastError());
return;
}

接收

if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,
(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
{
printf("GetQueuedCompletionStatus failed with error %d/n", GetLastError());
return 0;
}


WSARecv 本身引數是有型別大小限定的
亂傳遞也沒用的;
  回覆  引用  檢視   

#15樓 [樓主]  蛙蛙池塘       Posted @ 2008-07-14 09:37
@曲濱*銘龘鶽
這可咋辦,這可咋整,這可咋好,閃靈給搭救一下殘喘吧。難道在c#裡知道一個類的成員的記憶體地址,不能知道它所在類的記憶體地址嗎?   回覆  引用  檢視   

#16樓   曲濱*銘龘鶽       Posted @ 2008-07-14 10:23
--引用--------------------------------------------------
蛙蛙池塘:

@曲濱*銘龘鶽這可咋辦,這可咋整,這可咋好,閃靈給搭救一下殘喘吧。難道在c#裡知道一個類的成員的記憶體地址,不能知道它所在類的記憶體地址嗎?
--------------------------------------------------------

.net 的 物件好像和 C++ 不同不是一片連續的記憶體地址所以和C++ API 互動只能用人家能識別的東西值型別、結構體等;

類不加 [StructLayout(LayoutKind.Sequential)] 標籤的都不行好像;   回覆  引用  檢視   

#17樓   T.t.T!Ck.¢#       Posted @ 2008-07-14 10:35
笨蛙蛙
o(∩_∩)o   回覆  引用  檢視   

#18樓   overred       Posted @ 2009-01-05 16:35
...
.net 裡的非同步IO跟非阻塞 IOCP還是有區別的吧

比如在縣城切換以及阻塞上
我感覺非同步IO就是系統做完然後通知你(不能精確)
非阻塞的IOCP則是自己不停的看看我是可以做某個IO,只要是一個執行緒忙,在cpu上就不開其他的,如果阻塞則開啟新執行緒
while (true)
{GetQueuedCompletionStatus。。。}

。。。

iocp在 vc裡用得 多 。。。
蛙蛙今天沒來上班 扣你工資。。。。   回覆  引用  檢視   

#19樓 [樓主]  蛙蛙池塘       Posted @ 2009-01-05 20:30
@overred
我明天不去公司,後天也不去,大後天也不去,大大後天也不去。。。。   回覆  引用  檢視