(IOCP)-C#高性能Socket服務器的實現
C#高性能Socket服務器的實現(IOCP)
https://www.jianshu.com/p/c65c0eb59f22
引言
我一直在探尋一個高性能的Socket客戶端代碼。以前,我使用Socket類寫了一些基於傳統異步編程模型的代碼(BeginSend、BeginReceive,等等)也看過很多博客的知識,在linux中有poll和epoll來實現,在windows下面
微軟MSDN中也提供了SocketAsyncEventArgs這個類來實現IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
NET Framework中的APM也稱為Begin/End模式。這是因為會調用Begin方法來啟動異步操作,然後返回一個IAsyncResult 對象。可以選擇將一個代理作為參數提供給Begin方法,異步操作完成時會調用該方法。或者,一個線程可以等待 IAsyncResult.AsyncWaitHandle。當回調被調用或發出等待信號時,就會調用End方法來獲取異步操作的結果。這種模式很靈活,使用相對簡單,在 .NET Framework 中非常常見。
但是,您必須註意,如果進行大量異步套接字操作,是要付出代價的。針對每次操作,都必須創建一個IAsyncResult對象,而且該對象不能被重復使用。由於大量使用對象分配和垃圾收集,這會影響性能。為了解決這個問題,新版本提供了另一個使用套接字上執行異步I/O的方法模式。這種新模式並不要求為每個套接字操作分配操作上下文對象。
代碼下載:http://download.csdn.net/detail/zhujunxxxxx/8431289這裏的代碼優化了的
目標
在上面微軟提供的例子我覺得不是很完整,沒有具體一個流程,只是受到客戶端消息後發送相同內容給客戶端,初學者不容易看懂流程,因為我花了一天的時間來實現一個功能齊全的IOCP服務器,
效果如下 代碼
首先是ICOPServer.cs 這個類是IOCP服務器的核心類,目前這個類是網絡上比較全的代碼,MSDN上面的例子都沒有我的全
using?System;?? using?System.Collections.Generic;?? using?System.Linq;?? using?System.Text;?? using?System.Net.Sockets;?? using?System.Net;?? using?System.Threading;?? namespace?ServerTest?? {?? ///??? ///?IOCP?SOCKET服務器?? ///??? public?class?IOCPServer?:?IDisposable?? ????{?? const?int?opsToPreAlloc?=?2;?? ????????#region?Fields?? ///??? ///?服務器程序允許的最大客戶端連接數?? ///??? private?int?_maxClient;?? ///??? ///?監聽Socket,用於接受客戶端的連接請求?? ///??? private?Socket?_serverSock;?? ///??? ///?當前的連接的客戶端數?? ///??? private?int?_clientCount;?? ///??? ///?用於每個I/O?Socket操作的緩沖區大小?? ///??? private?int?_bufferSize?=?1024;?? ///??? ///?信號量?? ///??? ????????Semaphore?_maxAcceptedClients;?? ///??? ///?緩沖區管理?? ///??? ????????BufferManager?_bufferManager;?? ///??? ///?對象池?? ///??? ????????SocketAsyncEventArgsPool?_objectPool;?? private?bool?disposed?=?false;?? ????????#endregion?? ????????#region?Properties?? ///??? ///?服務器是否正在運行?? ///??? public?bool?IsRunning?{?get;?private?set;?}?? ///??? ///?監聽的IP地址?? ///??? public?IPAddress?Address?{?get;?private?set;?}?? ///??? ///?監聽的端口?? ///??? public?int?Port?{?get;?private?set;?}?? ///??? ///?通信使用的編碼?? ///??? public?Encoding?Encoding?{?get;?set;?}?? ????????#endregion?? ????????#region?Ctors?? ///??? ///?異步IOCP?SOCKET服務器?? ///??? ///?監聽的端口?? ///?最大的客戶端數量?? public?IOCPServer(int?listenPort,int?maxClient)?? :this(IPAddress.Any,?listenPort,?maxClient)?? ????????{?? ????????}?? ///??? ///?異步Socket?TCP服務器?? ///??? ///?監聽的終結點?? ///?最大客戶端數量?? public?IOCPServer(IPEndPoint?localEP,?int?maxClient)?? :this(localEP.Address,?localEP.Port,maxClient)?? ????????{?? ????????}?? ///??? ///?異步Socket?TCP服務器?? ///??? ///?監聽的IP地址?? ///?監聽的端口?? ///?最大客戶端數量?? public?IOCPServer(IPAddress?localIPAddress,?int?listenPort,?int?maxClient)?? ????????{?? this.Address?=?localIPAddress;?? this.Port?=?listenPort;?? this.Encoding?=?Encoding.Default;?? ????????????_maxClient?=?maxClient;?? _serverSock?=new?Socket(localIPAddress.AddressFamily,?SocketType.Stream,?ProtocolType.Tcp);?? _bufferManager?=new?BufferManager(_bufferSize?*?_maxClient?*?opsToPreAlloc,_bufferSize);?? _objectPool?=new?SocketAsyncEventArgsPool(_maxClient);?? _maxAcceptedClients?=new?Semaphore(_maxClient,?_maxClient);??? ????????}?? ????????#endregion?? ????????#region?初始化?? ///??? ///?初始化函數?? ///??? public?void?Init()?? ????????{?? //?Allocates?one?large?byte?buffer?which?all?I/O?operations?use?a?piece?of.??This?gaurds??? //?against?memory?fragmentation?? ????????????_bufferManager.InitBuffer();?? //?preallocate?pool?of?SocketAsyncEventArgs?objects?? ????????????SocketAsyncEventArgs?readWriteEventArg;?? for?(int?i?=?0;?i?<?_maxClient;?i++)?? ????????????{?? //Pre-allocate?a?set?of?reusable?SocketAsyncEventArgs?? readWriteEventArg?=new?SocketAsyncEventArgs();?? readWriteEventArg.Completed?+=new?EventHandler(OnIOCompleted);?? readWriteEventArg.UserToken?=null;?? //?assign?a?byte?buffer?from?the?buffer?pool?to?the?SocketAsyncEventArg?object?? ????????????????_bufferManager.SetBuffer(readWriteEventArg);?? //?add?SocketAsyncEventArg?to?the?pool?? ????????????????_objectPool.Push(readWriteEventArg);?? ????????????}?? ????????}?? ????????#endregion?? ????????#region?Start?? ///??? ///?啟動?? ///??? public?void?Start()?? ????????{?? if?(!IsRunning)?? ????????????{?? ????????????????Init();?? IsRunning?=true;?? IPEndPoint?localEndPoint?=new?IPEndPoint(Address,?Port);?? //?創建監聽socket?? _serverSock?=new?Socket(localEndPoint.AddressFamily,?SocketType.Stream,?ProtocolType.Tcp);?? //_serverSock.ReceiveBufferSize?=?_bufferSize;?? //_serverSock.SendBufferSize?=?_bufferSize;?? if?(localEndPoint.AddressFamily?==?AddressFamily.InterNetworkV6)?? ????????????????{?? //?配置監聽socket為?dual-mode?(IPv4?&?IPv6)??? //?27?is?equivalent?to?IPV6_V6ONLY?socket?option?in?the?winsock?snippet?below,?? _serverSock.SetSocketOption(SocketOptionLevel.IPv6,?(SocketOptionName)27,false);?? _serverSock.Bind(new?IPEndPoint(IPAddress.IPv6Any,?localEndPoint.Port));?? ????????????????}?? else?? ????????????????{?? ????????????????????_serverSock.Bind(localEndPoint);?? ????????????????}?? //?開始監聽?? _serverSock.Listen(this._maxClient);?? //?在監聽Socket上投遞一個接受請求。?? StartAccept(null);?? ????????????}?? ????????}?? ????????#endregion?? ????????#region?Stop?? ///??? ///?停止服務?? ///??? public?void?Stop()?? ????????{?? if?(IsRunning)?? ????????????{?? IsRunning?=false;?? ????????????????_serverSock.Close();?? //TODO?關閉對所有客戶端的連接?? ????????????}?? ????????}?? ????????#endregion?? ????????#region?Accept?? ///??? ///?從客戶端開始接受一個連接操作?? ///??? private?void?StartAccept(SocketAsyncEventArgs?asyniar)?? ????????{?? if?(asyniar?==?null)?? ????????????{?? asyniar?=new?SocketAsyncEventArgs();?? asyniar.Completed?+=new?EventHandler(OnAcceptCompleted);?? ????????????}?? else?? ????????????{?? //socket?must?be?cleared?since?the?context?object?is?being?reused?? asyniar.AcceptSocket?=null;?? ????????????}?? ????????????_maxAcceptedClients.WaitOne();?? if?(!_serverSock.AcceptAsync(asyniar))?? ????????????{?? ????????????????ProcessAccept(asyniar);?? //如果I/O掛起等待異步則觸發AcceptAsyn_Asyn_Completed事件?? //此時I/O操作同步完成,不會觸發Asyn_Completed事件,所以指定BeginAccept()方法?? ????????????}?? ????????}?? ///??? ///?accept?操作完成時回調函數?? ///??? ///?Object?who?raised?the?event.?? ///?SocketAsyncEventArg?associated?with?the?completed?accept?operation.?? private?void?OnAcceptCompleted(object?sender,?SocketAsyncEventArgs?e)?? ????????{?? ????????????ProcessAccept(e);?? ????????}?? ///??? ///?監聽Socket接受處理?? ///??? ///?SocketAsyncEventArg?associated?with?the?completed?accept?operation.?? private?void?ProcessAccept(SocketAsyncEventArgs?e)?? ????????{?? if?(e.SocketError?==?SocketError.Success)?? ????????????{?? Socket?s?=?e.AcceptSocket;//和客戶端關聯的socket?? if?(s.Connected)?? ????????????????{?? try?? ????????????????????{?? Interlocked.Increment(ref?_clientCount);//原子操作加1?? ????????????????????????SocketAsyncEventArgs?asyniar?=?_objectPool.Pop();?? ????????????????????????asyniar.UserToken?=?s;?? Log4Debug(String.Format("客戶?{0}?連入,?共有?{1}?個連接。",?s.RemoteEndPoint.ToString(),?_clientCount));?? if?(!s.ReceiveAsync(asyniar))//投遞接收請求?? ????????????????????????{?? ????????????????????????????ProcessReceive(asyniar);?? ????????????????????????}?? ????????????????????}?? catch?(SocketException?ex)?? ????????????????????{?? Log4Debug(String.Format("接收客戶?{0}?數據出錯,?異常信息:?{1}?。",?s.RemoteEndPoint,?ex.ToString()));?? //TODO?異常處理?? ????????????????????}?? //投遞下一個接受請求?? ????????????????????StartAccept(e);?? ????????????????}?? ????????????}?? ????????}?? ????????#endregion?? ????????#region?發送數據?? ///??? ///?異步的發送數據?? ///??? ///??? ///??? public?void?Send(SocketAsyncEventArgs?e,?byte[]?data)?? ????????{?? if?(e.SocketError?==?SocketError.Success)?? ????????????{?? Socket?s?=?e.AcceptSocket;//和客戶端關聯的socket?? if?(s.Connected)?? ????????????????{?? Array.Copy(data,?0,?e.Buffer,?0,?data.Length);//設置發送數據?? //e.SetBuffer(data,?0,?data.Length);?//設置發送數據?? if?(!s.SendAsync(e))//投遞發送請求,這個函數有可能同步發送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件?? ????????????????????{?? //?同步發送時處理發送完成事件?? ????????????????????????ProcessSend(e);?? ????????????????????}?? else?? ????????????????????{?? ????????????????????????CloseClientSocket(e);?? ????????????????????}?? ????????????????}?? ????????????}?? ????????}?? ///??? ///?同步的使用socket發送數據?? ///??? ///??? ///??? ///??? ///??? ///??? public?void?Send(Socket?socket,?byte[]?buffer,?int?offset,?int?size,?int?timeout)?? ????????{?? ????????????socket.SendTimeout?=?0;?? int?startTickCount?=?Environment.TickCount;?? int?sent?=?0;?//?how?many?bytes?is?already?sent?? do?? ????????????{?? if?(Environment.TickCount?>?startTickCount?+?timeout)?? ????????????????{?? //throw?new?Exception("Timeout.");?? ????????????????}?? try?? ????????????????{?? ????????????????????sent?+=?socket.Send(buffer,?offset?+?sent,?size?-?sent,?SocketFlags.None);?? ????????????????}?? catch?(SocketException?ex)?? ????????????????{?? if?(ex.SocketErrorCode?==?SocketError.WouldBlock?||?? ????????????????????ex.SocketErrorCode?==?SocketError.IOPending?||?? ????????????????????ex.SocketErrorCode?==?SocketError.NoBufferSpaceAvailable)?? ????????????????????{?? //?socket?buffer?is?probably?full,?wait?and?try?again?? ????????????????????????Thread.Sleep(30);?? ????????????????????}?? else?? ????????????????????{?? throw?ex;?//?any?serious?error?occurr?? ????????????????????}?? ????????????????}?? }while?(sent?<?size);?? ????????}?? ///??? ///?發送完成時處理函數?? ///??? ///?與發送完成操作相關聯的SocketAsyncEventArg對象?? private?void?ProcessSend(SocketAsyncEventArgs?e)?? ????????{?? if?(e.SocketError?==?SocketError.Success)?? ????????????{?? ????????????????Socket?s?=?(Socket)e.UserToken;?? //TODO?? ????????????}?? else?? ????????????{?? ????????????????CloseClientSocket(e);?? ????????????}?? ????????}?? ????????#endregion?? ????????#region?接收數據?? ///??? ///接收完成時處理函數?? ///??? ///?與接收完成操作相關聯的SocketAsyncEventArg對象?? private?void?ProcessReceive(SocketAsyncEventArgs?e)?? ????????{?? if?(e.SocketError?==?SocketError.Success)//if?(e.BytesTransferred?>?0?&&?e.SocketError?==?SocketError.Success)?? ????????????{?? //?檢查遠程主機是否關閉連接?? if?(e.BytesTransferred?>?0)?? ????????????????{?? ????????????????????Socket?s?=?(Socket)e.UserToken;?? //判斷所有需接收的數據是否已經完成?? if?(s.Available?==?0)?? ????????????????????{?? //從偵聽者獲取接收到的消息。??? //String?received?=?Encoding.ASCII.GetString(e.Buffer,?e.Offset,?e.BytesTransferred);?? //echo?the?data?received?back?to?the?client?? //e.SetBuffer(e.Offset,?e.BytesTransferred);?? byte[]?data?=?new?byte[e.BytesTransferred];?? Array.Copy(e.Buffer,?e.Offset,?data,?0,?data.Length);//從e.Buffer塊中復制數據出來,保證它可重用?? string?info=Encoding.Default.GetString(data);?? Log4Debug(String.Format("收到?{0}?數據為?{1}",s.RemoteEndPoint.ToString(),info));?? //TODO?處理數據?? //增加服務器接收的總字節數。?? ????????????????????}?? if?(!s.ReceiveAsync(e))//為接收下一段數據,投遞接收請求,這個函數有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件?? ????????????????????{?? //同步接收時處理接收完成事件?? ????????????????????????ProcessReceive(e);?? ????????????????????}?? ????????????????}?? ????????????}?? else?? ????????????{?? ????????????????CloseClientSocket(e);?? ????????????}?? ????????}?? ????????#endregion?? ????????#region?回調函數?? ///??? ///?當Socket上的發送或接收請求被完成時,調用此函數?? ///??? ///?激發事件的對象?? ///?與發送或接收完成操作相關聯的SocketAsyncEventArg對象?? private?void?OnIOCompleted(object?sender,?SocketAsyncEventArgs?e)?? ????????{?? //?Determine?which?type?of?operation?just?completed?and?call?the?associated?handler.?? switch?(e.LastOperation)?? ????????????{?? case?SocketAsyncOperation.Accept:?? ????????????????????ProcessAccept(e);?? break;?? case?SocketAsyncOperation.Receive:?? ????????????????????ProcessReceive(e);?? break;?? default:?? throw?new?ArgumentException("The?last?operation?completed?on?the?socket?was?not?a?receive?or?send");?? ????????????}?? ????????}?? ????????#endregion?? ????????#region?Close?? ///??? ///?關閉socket連接?? ///??? ///?SocketAsyncEventArg?associated?with?the?completed?send/receive?operation.?? private?void?CloseClientSocket(SocketAsyncEventArgs?e)?? ????????{?? Log4Debug(String.Format("客戶?{0}?斷開連接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));?? Socket?s?=?e.UserTokenas?Socket;?? ????????????CloseClientSocket(s,?e);?? ????????}?? ///??? ///?關閉socket連接?? ///??? ///??? ///??? private?void?CloseClientSocket(Socket?s,?SocketAsyncEventArgs?e)?? ????????{?? try?? ????????????{?? ????????????????s.Shutdown(SocketShutdown.Send);?? ????????????}?? catch?(Exception)?? ????????????{?? //?Throw?if?client?has?closed,?so?it?is?not?necessary?to?catch.?? ????????????}?? finally?? ????????????{?? ????????????????s.Close();?? ????????????}?? Interlocked.Decrement(ref?_clientCount);?? ????????????_maxAcceptedClients.Release();?? _objectPool.Push(e);//SocketAsyncEventArg?對象被釋放,壓入可重用隊列。?? ????????}?? ????????#endregion?? ????????#region?Dispose?? ///??? ///?Performs?application-defined?tasks?associated?with?freeing,??? ///?releasing,?or?resetting?unmanaged?resources.?? ///??? public?void?Dispose()?? ????????{?? Dispose(true);?? GC.SuppressFinalize(this);?? ????????}?? ///??? ///?Releases?unmanaged?and?-?optionally?-?managed?resources?? ///??? ///?true?to?release??? ///?both?managed?and?unmanaged?resources;?false??? ///?to?release?only?unmanaged?resources.?? protected?virtual?void?Dispose(bool?disposing)?? ????????{?? if?(!this.disposed)?? ????????????{?? if?(disposing)?? ????????????????{?? try?? ????????????????????{?? ????????????????????????Stop();?? if?(_serverSock?!=?null)?? ????????????????????????{?? _serverSock?=null;?? ????????????????????????}?? ????????????????????}?? catch?(SocketException?ex)?? ????????????????????{?? //TODO?事件?? ????????????????????}?? ????????????????}?? disposed?=true;?? ????????????}?? ????????}?? ????????#endregion?? public?void?Log4Debug(string?msg)?? ????????{?? Console.WriteLine("notice:"+msg);?? ????????}?? ????}?? }
(IOCP)-C#高性能Socket服務器的實現