1. 程式人生 > >(IOCP)-C#高性能Socket服務器的實現

(IOCP)-C#高性能Socket服務器的實現

地址 iar code real 比較 異步socket iou ogr 收集

C#高性能Socket服務器的實現(IOCP)

https://www.jianshu.com/p/c65c0eb59f22

引言

我一直在探尋一個高性能的Socket客戶端代碼。以前,我使用Socket類寫了一些基於傳統異步編程模型的代碼(BeginSend、BeginReceive,等等)也看過很多博客的知識,在linux中有poll和epoll來實現,在windows下面

微軟MSDN中也提供了SocketAsyncEventArgs這個類來實現IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx

NET Framework中的APM也稱為Begin/End模式。這是因為會調用Begin方法來啟動異步操作,然後返回一個IAsyncResult 對象。可以選擇將一個代理作為參數提供給Begin方法,異步操作完成時會調用該方法。或者,一個線程可以等待 IAsyncResult.AsyncWaitHandle。當回調被調用或發出等待信號時,就會調用End方法來獲取異步操作的結果。這種模式很靈活,使用相對簡單,在 .NET Framework 中非常常見。

但是,您必須註意,如果進行大量異步套接字操作,是要付出代價的。針對每次操作,都必須創建一個IAsyncResult對象,而且該對象不能被重復使用。由於大量使用對象分配和垃圾收集,這會影響性能。為了解決這個問題,新版本提供了另一個使用套接字上執行異步I/O的方法模式。這種新模式並不要求為每個套接字操作分配操作上下文對象。

代碼下載:http://download.csdn.net/detail/zhujunxxxxx/8431289這裏的代碼優化了的

目標

在上面微軟提供的例子我覺得不是很完整,沒有具體一個流程,只是受到客戶端消息後發送相同內容給客戶端,初學者不容易看懂流程,因為我花了一天的時間來實現一個功能齊全的IOCP服務器,

效果如下 技術分享圖片 代碼

首先是ICOPServer.cs 這個類是IOCP服務器的核心類,目前這個類是網絡上比較全的代碼,MSDN上面的例子都沒有我的全

using?System;??

using?System.Collections.Generic;??

using?System.Linq;??

using?System.Text;??

using?System.Net.Sockets;??

using?System.Net;??

using?System.Threading;??



namespace?ServerTest??

{??

///???

///?IOCP?SOCKET服務器??

///???

public?class?IOCPServer?:?IDisposable??

????{??

const?int?opsToPreAlloc?=?2;??

????????#region?Fields??

///???

///?服務器程序允許的最大客戶端連接數??

///???

private?int?_maxClient;??



///???

///?監聽Socket,用於接受客戶端的連接請求??

///???

private?Socket?_serverSock;??



///???

///?當前的連接的客戶端數??

///???

private?int?_clientCount;??



///???

///?用於每個I/O?Socket操作的緩沖區大小??

///???

private?int?_bufferSize?=?1024;??



///???

///?信號量??

///???

????????Semaphore?_maxAcceptedClients;??



///???

///?緩沖區管理??

///???

????????BufferManager?_bufferManager;??



///???

///?對象池??

///???

????????SocketAsyncEventArgsPool?_objectPool;??



private?bool?disposed?=?false;??

????????#endregion??

????????#region?Properties??



///???

///?服務器是否正在運行??

///???

public?bool?IsRunning?{?get;?private?set;?}??

///???

///?監聽的IP地址??

///???

public?IPAddress?Address?{?get;?private?set;?}??

///???

///?監聽的端口??

///???

public?int?Port?{?get;?private?set;?}??

///???

///?通信使用的編碼??

///???

public?Encoding?Encoding?{?get;?set;?}??

????????#endregion??

????????#region?Ctors??



///???

///?異步IOCP?SOCKET服務器??

///???

///?監聽的端口??

///?最大的客戶端數量??

public?IOCPServer(int?listenPort,int?maxClient)??

:this(IPAddress.Any,?listenPort,?maxClient)??

????????{??

????????}??



///???

///?異步Socket?TCP服務器??

///???

///?監聽的終結點??

///?最大客戶端數量??

public?IOCPServer(IPEndPoint?localEP,?int?maxClient)??

:this(localEP.Address,?localEP.Port,maxClient)??

????????{??

????????}??



///???

///?異步Socket?TCP服務器??

///???

///?監聽的IP地址??

///?監聽的端口??

///?最大客戶端數量??

public?IOCPServer(IPAddress?localIPAddress,?int?listenPort,?int?maxClient)??

????????{??

this.Address?=?localIPAddress;??

this.Port?=?listenPort;??

this.Encoding?=?Encoding.Default;??



????????????_maxClient?=?maxClient;??



_serverSock?=new?Socket(localIPAddress.AddressFamily,?SocketType.Stream,?ProtocolType.Tcp);??



_bufferManager?=new?BufferManager(_bufferSize?*?_maxClient?*?opsToPreAlloc,_bufferSize);??



_objectPool?=new?SocketAsyncEventArgsPool(_maxClient);??



_maxAcceptedClients?=new?Semaphore(_maxClient,?_maxClient);???

????????}??

????????#endregion??

????????#region?初始化??



///???

///?初始化函數??

///???

public?void?Init()??

????????{??

//?Allocates?one?large?byte?buffer?which?all?I/O?operations?use?a?piece?of.??This?gaurds???

//?against?memory?fragmentation??

????????????_bufferManager.InitBuffer();??



//?preallocate?pool?of?SocketAsyncEventArgs?objects??

????????????SocketAsyncEventArgs?readWriteEventArg;??



for?(int?i?=?0;?i?<?_maxClient;?i++)??

????????????{??

//Pre-allocate?a?set?of?reusable?SocketAsyncEventArgs??

readWriteEventArg?=new?SocketAsyncEventArgs();??

readWriteEventArg.Completed?+=new?EventHandler(OnIOCompleted);??

readWriteEventArg.UserToken?=null;??



//?assign?a?byte?buffer?from?the?buffer?pool?to?the?SocketAsyncEventArg?object??

????????????????_bufferManager.SetBuffer(readWriteEventArg);??



//?add?SocketAsyncEventArg?to?the?pool??

????????????????_objectPool.Push(readWriteEventArg);??

????????????}??



????????}??

????????#endregion??

????????#region?Start??

///???

///?啟動??

///???

public?void?Start()??

????????{??

if?(!IsRunning)??

????????????{??

????????????????Init();??

IsRunning?=true;??

IPEndPoint?localEndPoint?=new?IPEndPoint(Address,?Port);??

//?創建監聽socket??

_serverSock?=new?Socket(localEndPoint.AddressFamily,?SocketType.Stream,?ProtocolType.Tcp);??

//_serverSock.ReceiveBufferSize?=?_bufferSize;??

//_serverSock.SendBufferSize?=?_bufferSize;??

if?(localEndPoint.AddressFamily?==?AddressFamily.InterNetworkV6)??

????????????????{??

//?配置監聽socket為?dual-mode?(IPv4?&?IPv6)???

//?27?is?equivalent?to?IPV6_V6ONLY?socket?option?in?the?winsock?snippet?below,??

_serverSock.SetSocketOption(SocketOptionLevel.IPv6,?(SocketOptionName)27,false);??

_serverSock.Bind(new?IPEndPoint(IPAddress.IPv6Any,?localEndPoint.Port));??

????????????????}??

else??

????????????????{??

????????????????????_serverSock.Bind(localEndPoint);??

????????????????}??

//?開始監聽??

_serverSock.Listen(this._maxClient);??

//?在監聽Socket上投遞一個接受請求。??

StartAccept(null);??

????????????}??

????????}??

????????#endregion??

????????#region?Stop??



///???

///?停止服務??

///???

public?void?Stop()??

????????{??

if?(IsRunning)??

????????????{??

IsRunning?=false;??

????????????????_serverSock.Close();??

//TODO?關閉對所有客戶端的連接??



????????????}??

????????}??

????????#endregion??

????????#region?Accept??



///???

///?從客戶端開始接受一個連接操作??

///???

private?void?StartAccept(SocketAsyncEventArgs?asyniar)??

????????{??

if?(asyniar?==?null)??

????????????{??

asyniar?=new?SocketAsyncEventArgs();??

asyniar.Completed?+=new?EventHandler(OnAcceptCompleted);??

????????????}??

else??

????????????{??

//socket?must?be?cleared?since?the?context?object?is?being?reused??

asyniar.AcceptSocket?=null;??

????????????}??

????????????_maxAcceptedClients.WaitOne();??

if?(!_serverSock.AcceptAsync(asyniar))??

????????????{??

????????????????ProcessAccept(asyniar);??

//如果I/O掛起等待異步則觸發AcceptAsyn_Asyn_Completed事件??

//此時I/O操作同步完成,不會觸發Asyn_Completed事件,所以指定BeginAccept()方法??

????????????}??

????????}??



///???

///?accept?操作完成時回調函數??

///???

///?Object?who?raised?the?event.??

///?SocketAsyncEventArg?associated?with?the?completed?accept?operation.??

private?void?OnAcceptCompleted(object?sender,?SocketAsyncEventArgs?e)??

????????{??

????????????ProcessAccept(e);??

????????}??



///???

///?監聽Socket接受處理??

///???

///?SocketAsyncEventArg?associated?with?the?completed?accept?operation.??

private?void?ProcessAccept(SocketAsyncEventArgs?e)??

????????{??

if?(e.SocketError?==?SocketError.Success)??

????????????{??

Socket?s?=?e.AcceptSocket;//和客戶端關聯的socket??

if?(s.Connected)??

????????????????{??

try??

????????????????????{??



Interlocked.Increment(ref?_clientCount);//原子操作加1??

????????????????????????SocketAsyncEventArgs?asyniar?=?_objectPool.Pop();??

????????????????????????asyniar.UserToken?=?s;??



Log4Debug(String.Format("客戶?{0}?連入,?共有?{1}?個連接。",?s.RemoteEndPoint.ToString(),?_clientCount));??



if?(!s.ReceiveAsync(asyniar))//投遞接收請求??

????????????????????????{??

????????????????????????????ProcessReceive(asyniar);??

????????????????????????}??

????????????????????}??

catch?(SocketException?ex)??

????????????????????{??

Log4Debug(String.Format("接收客戶?{0}?數據出錯,?異常信息:?{1}?。",?s.RemoteEndPoint,?ex.ToString()));??

//TODO?異常處理??

????????????????????}??

//投遞下一個接受請求??

????????????????????StartAccept(e);??

????????????????}??

????????????}??

????????}??

????????#endregion??

????????#region?發送數據??



///???

///?異步的發送數據??

///???

///???

///???

public?void?Send(SocketAsyncEventArgs?e,?byte[]?data)??

????????{??

if?(e.SocketError?==?SocketError.Success)??

????????????{??

Socket?s?=?e.AcceptSocket;//和客戶端關聯的socket??

if?(s.Connected)??

????????????????{??

Array.Copy(data,?0,?e.Buffer,?0,?data.Length);//設置發送數據??



//e.SetBuffer(data,?0,?data.Length);?//設置發送數據??

if?(!s.SendAsync(e))//投遞發送請求,這個函數有可能同步發送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件??

????????????????????{??

//?同步發送時處理發送完成事件??

????????????????????????ProcessSend(e);??

????????????????????}??

else??

????????????????????{??

????????????????????????CloseClientSocket(e);??

????????????????????}??

????????????????}??

????????????}??

????????}??



///???

///?同步的使用socket發送數據??

///???

///???

///???

///???

///???

///???

public?void?Send(Socket?socket,?byte[]?buffer,?int?offset,?int?size,?int?timeout)??

????????{??

????????????socket.SendTimeout?=?0;??

int?startTickCount?=?Environment.TickCount;??

int?sent?=?0;?//?how?many?bytes?is?already?sent??

do??

????????????{??

if?(Environment.TickCount?>?startTickCount?+?timeout)??

????????????????{??

//throw?new?Exception("Timeout.");??

????????????????}??

try??

????????????????{??

????????????????????sent?+=?socket.Send(buffer,?offset?+?sent,?size?-?sent,?SocketFlags.None);??

????????????????}??

catch?(SocketException?ex)??

????????????????{??

if?(ex.SocketErrorCode?==?SocketError.WouldBlock?||??

????????????????????ex.SocketErrorCode?==?SocketError.IOPending?||??

????????????????????ex.SocketErrorCode?==?SocketError.NoBufferSpaceAvailable)??

????????????????????{??

//?socket?buffer?is?probably?full,?wait?and?try?again??

????????????????????????Thread.Sleep(30);??

????????????????????}??

else??

????????????????????{??

throw?ex;?//?any?serious?error?occurr??

????????????????????}??

????????????????}??

}while?(sent?<?size);??

????????}??





///???

///?發送完成時處理函數??

///???

///?與發送完成操作相關聯的SocketAsyncEventArg對象??

private?void?ProcessSend(SocketAsyncEventArgs?e)??

????????{??

if?(e.SocketError?==?SocketError.Success)??

????????????{??

????????????????Socket?s?=?(Socket)e.UserToken;??



//TODO??

????????????}??

else??

????????????{??

????????????????CloseClientSocket(e);??

????????????}??

????????}??

????????#endregion??

????????#region?接收數據??





///???

///接收完成時處理函數??

///???

///?與接收完成操作相關聯的SocketAsyncEventArg對象??

private?void?ProcessReceive(SocketAsyncEventArgs?e)??

????????{??

if?(e.SocketError?==?SocketError.Success)//if?(e.BytesTransferred?>?0?&&?e.SocketError?==?SocketError.Success)??

????????????{??

//?檢查遠程主機是否關閉連接??

if?(e.BytesTransferred?>?0)??

????????????????{??

????????????????????Socket?s?=?(Socket)e.UserToken;??

//判斷所有需接收的數據是否已經完成??

if?(s.Available?==?0)??

????????????????????{??

//從偵聽者獲取接收到的消息。???

//String?received?=?Encoding.ASCII.GetString(e.Buffer,?e.Offset,?e.BytesTransferred);??

//echo?the?data?received?back?to?the?client??

//e.SetBuffer(e.Offset,?e.BytesTransferred);??



byte[]?data?=?new?byte[e.BytesTransferred];??

Array.Copy(e.Buffer,?e.Offset,?data,?0,?data.Length);//從e.Buffer塊中復制數據出來,保證它可重用??



string?info=Encoding.Default.GetString(data);??

Log4Debug(String.Format("收到?{0}?數據為?{1}",s.RemoteEndPoint.ToString(),info));??

//TODO?處理數據??



//增加服務器接收的總字節數。??

????????????????????}??



if?(!s.ReceiveAsync(e))//為接收下一段數據,投遞接收請求,這個函數有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件??

????????????????????{??

//同步接收時處理接收完成事件??

????????????????????????ProcessReceive(e);??

????????????????????}??

????????????????}??

????????????}??

else??

????????????{??

????????????????CloseClientSocket(e);??

????????????}??

????????}??

????????#endregion??

????????#region?回調函數??



///???

///?當Socket上的發送或接收請求被完成時,調用此函數??

///???

///?激發事件的對象??

///?與發送或接收完成操作相關聯的SocketAsyncEventArg對象??

private?void?OnIOCompleted(object?sender,?SocketAsyncEventArgs?e)??

????????{??

//?Determine?which?type?of?operation?just?completed?and?call?the?associated?handler.??

switch?(e.LastOperation)??

????????????{??

case?SocketAsyncOperation.Accept:??

????????????????????ProcessAccept(e);??

break;??

case?SocketAsyncOperation.Receive:??

????????????????????ProcessReceive(e);??

break;??

default:??

throw?new?ArgumentException("The?last?operation?completed?on?the?socket?was?not?a?receive?or?send");??

????????????}??

????????}??

????????#endregion??

????????#region?Close??

///???

///?關閉socket連接??

///???

///?SocketAsyncEventArg?associated?with?the?completed?send/receive?operation.??

private?void?CloseClientSocket(SocketAsyncEventArgs?e)??

????????{??

Log4Debug(String.Format("客戶?{0}?斷開連接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));??

Socket?s?=?e.UserTokenas?Socket;??

????????????CloseClientSocket(s,?e);??

????????}??



///???

///?關閉socket連接??

///???

///???

///???

private?void?CloseClientSocket(Socket?s,?SocketAsyncEventArgs?e)??

????????{??

try??

????????????{??

????????????????s.Shutdown(SocketShutdown.Send);??

????????????}??

catch?(Exception)??

????????????{??

//?Throw?if?client?has?closed,?so?it?is?not?necessary?to?catch.??

????????????}??

finally??

????????????{??

????????????????s.Close();??

????????????}??

Interlocked.Decrement(ref?_clientCount);??

????????????_maxAcceptedClients.Release();??

_objectPool.Push(e);//SocketAsyncEventArg?對象被釋放,壓入可重用隊列。??

????????}??

????????#endregion??

????????#region?Dispose??

///???

///?Performs?application-defined?tasks?associated?with?freeing,???

///?releasing,?or?resetting?unmanaged?resources.??

///???

public?void?Dispose()??

????????{??

Dispose(true);??

GC.SuppressFinalize(this);??

????????}??



///???

///?Releases?unmanaged?and?-?optionally?-?managed?resources??

///???

///?true?to?release???

///?both?managed?and?unmanaged?resources;?false???

///?to?release?only?unmanaged?resources.??

protected?virtual?void?Dispose(bool?disposing)??

????????{??

if?(!this.disposed)??

????????????{??

if?(disposing)??

????????????????{??

try??

????????????????????{??

????????????????????????Stop();??

if?(_serverSock?!=?null)??

????????????????????????{??

_serverSock?=null;??

????????????????????????}??

????????????????????}??

catch?(SocketException?ex)??

????????????????????{??

//TODO?事件??

????????????????????}??

????????????????}??

disposed?=true;??

????????????}??

????????}??

????????#endregion??



public?void?Log4Debug(string?msg)??

????????{??

Console.WriteLine("notice:"+msg);??

????????}??



????}??

}


(IOCP)-C#高性能Socket服務器的實現