Kestrel.Transport.Sockets分析與使用
相信大家都清楚asp core有著非常出色的效能,它出色的效能也源於網路服務模組Kestrel;在techempower測試中Kestrel基礎核心達到了700萬級別的RPS吞吐能力,具備這樣的能力那對應的Kestrel.Transport.Sockets也應有著不錯的效能。接下來簡單地分析一下 Kestrel.Transport.Sockets
的設計和使用,並進行簡單的併發處理能力測試。
async/await
async/await的使用這幾年時間裡大放異彩,現有新功能的IO操作方式無一不支援它,畢竟可以同步的程式碼方式來實現非同步處理功能,不管是開發,除錯還是維護都帶來的極大的便利性;既然這樣 Kestrel.Transport.Sockets
也在基礎的socket非同步基礎功能上引入了async/await設計,大大簡化了上層應用編寫的複雜度;下面看一下針對SocketAsyncEventArgs封裝的Awaitable。
public class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion { private static readonly Action _callbackCompleted = () => { }; private readonly PipeScheduler _ioScheduler; private Action _callback; public SocketAwaitableEventArgs(PipeScheduler ioScheduler) { _ioScheduler = ioScheduler; } public SocketAwaitableEventArgs GetAwaiter() => this; public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted); public int GetResult() { Debug.Assert(ReferenceEquals(_callback, _callbackCompleted)); _callback = null; if (SocketError != SocketError.Success) { ThrowSocketException(SocketError); } return BytesTransferred; void ThrowSocketException(SocketError e) { throw new SocketException((int)e); } } public void OnCompleted(Action continuation) { if (ReferenceEquals(_callback, _callbackCompleted) || ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted)) { Task.Run(continuation); } } public void UnsafeOnCompleted(Action continuation) { OnCompleted(continuation); } public void Complete() { OnCompleted(this); } protected override void OnCompleted(SocketAsyncEventArgs _) { var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted); if (continuation != null) { _ioScheduler.Schedule(state => ((Action)state)(), continuation); } } }
這個Awaitable的設計得非常好,它沒沒有引用新的物件,而是直接在 SocketAsyncEventArgs
的基礎派生下來實現,這樣在高併吞吐的情況可以更好地降低新物件的開銷;這即能使用await的同時也無需增加物件的開銷,不過PipeScheduler的呼叫設計竟然使用了匿名函式的方式帶入,這樣會增加了物件的開銷;還有就是 SocketAsyncEventArgs
完成後還投遞給一個執行緒排程去完成後面的工作,如果協議分析的工作量不大的情況個人感覺這個處理有些重了,不過使用都可以實現自己的PipeScheduler或直接改成執行continuation,最好是根據情況來配置最佳。
引入System.IO.Pipelines
在之前的文章已經說過Pipe,它是一個Buffer讀寫物件,其重要作用是可以把不連續的資料記憶體塊連線起來處理起來,這樣可以使普通開發人員避開Buffer的建立和回收的繁瑣工作(畢竟這一塊工作要做好還是有點難度的)。Pipe不緊緊提供了不連續資料Buffer的讀寫,它還具備一套await狀態機制可以讓使用人員從socket的receive和send工作分離出來。每個連線會分配兩個Pipe物件,主要負責Socket的receive和send工作;其工作原理如下:
基於Pipe使用者只需要關心應用協議處理處理即可,而這個處理會變得非常簡單;只需要關注Pipe的Writer和Reader即可。雖然這樣做帶來了便利性,但經過Pipe多了兩層狀態通訊多多少少會有效能上的影響,但這些影響相對Buffer開銷,GC和處理來說則還是有比較好的回報的。這裡還是要重吐嘲一下MS,為什麼Writer和Reader不按BinaryReader和BinaryWriter的基準作為設計,其實Pipe對普通使用者來說還是不怎友好的!
使用
Kestrel.Transport.Sockets
的使用還真有點讓人頭痛,首先它沒有完善的文件,還有設計整合度也比較高。要搞清楚怎麼用對於新手來說還真不怎容易,出於研究它的設計和對比查看了一段時間原始碼才總結出來如何用;最終發現要用得好還需真要再做一層封裝才能更好的用於實限應用中;下面講解一下如何簡單地使用它吧,首先你要在Nuget中引用它。
構建
Kestrel.Transport.Sockets
的使用入口是 SocketTransportFactory
,只要能構建這個物件那接下工作就簡單很多,首先看一下這個物件的建構函式
public SocketTransportFactory(IOptions<SocketTransportOptions> options, IApplicationLifetime applicationLifetime, ILoggerFactory loggerFactory);
三個引數都是介面……沒有文件的情況還真有點頭痛。ILoggerFactory引用Microsoft.Extensions.Logging可以得到,剩下兩個簡單地實現一下即可。
IOptions<SocketTransportOptions>
public class SocketOpetion : IOptions<SocketTransportOptions> { public SocketTransportOptions Value => new SocketTransportOptions(); }
IApplicationLifetime
public class ApplicationLifetime : IApplicationLifetime { public ApplicationLifetime() : this(new CancellationToken(), new CancellationToken(), new CancellationToken()) { } public ApplicationLifetime(CancellationToken started, CancellationToken stopping, CancellationToken stoped) { ApplicationStarted = started; ApplicationStopping = stopping; ApplicationStopped = stoped; } public CancellationToken ApplicationStarted { get; set; } public CancellationToken ApplicationStopping { get; set; } public CancellationToken ApplicationStopped { get; set; } public virtual void StopApplication() { } }
建立服務
以上介面的實現都有了,接下來就可以建立 SocketTransportFactory
物件了
private static async void ListenSocket(int prot) { var loggerFactory = new LoggerFactory(); ApplicationLifetime applicationLifetime = new ApplicationLifetime(); var server = new SocketTransportFactory(new SocketOpetion(), applicationLifetime, loggerFactory); await server.Create(new AnyEndPointInformation(prot), new Program()).BindAsync(); }
同樣 SocketTransportFactory
的Create方法也需要兩個介面引數,一個是監聽型別和地址描述,一個連線排程器。這裡只需要IP埠監聽所以實現起來比較簡單:
public class AnyEndPointInformation : IEndPointInformation { public AnyEndPointInformation(int port) { IPEndPoint = new IPEndPoint(IPAddress.Any, port); } public ListenType Type => ListenType.IPEndPoint; public IPEndPoint IPEndPoint { get; set; } public string SocketPath => throw new NotImplementedException(); public ulong FileHandle => throw new NotImplementedException(); public FileHandleType HandleType { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } public bool NoDelay => true; }
接下來的工作就在 IConnectionDispatcher
介面的OnConnection方法下處理連線
public void OnConnection(TransportConnection connection) { Session session = new Session(connection); Task.Run(session.StartRecive); }
剛開始以為有了 TransportConnection
就可以進行資料接收和傳送,但事情是我想得太簡單了!其實TransportConnection並不具備資料處理能力,因為裡面兩路的Pipe是空的……使用者需要自己定義對應的Pipe並設定給它,以上程式碼的Session是需要自己實現的,名稱隨自己喜歡定義;實現介面 IDuplexPipe
,設定兩路的Pipe物件,然後設定到 TransportConnection.Application
屬性上。實現 IDuplexPipe
後就可以進行資料接收和傳送功能了,以下是實現了一個簡單的 StartRecive
後回發資料,有收有發才便於下面測試的工作。
public async Task StartRecive() { while (true) { var data = await Receiver.ReadAsync(); if (data.IsCompleted) { this.Dispose(); break; } var buffers = data.Buffer; var end = buffers.End; if (buffers.IsSingleSegment) { ReadOnlyMemory<byte> b = buffers.First; var sbuf = Sender.GetMemory(b.Length); b.CopyTo(sbuf); Sender.Advance(b.Length); } else { foreach (var b in buffers) { var sbuf = Sender.GetMemory(b.Length); b.CopyTo(sbuf); Sender.Advance(b.Length); } } var flush = await Sender.FlushAsync(); Receiver.AdvanceTo(end); } }
測試
既然研究它自然就會關心它的效能情況,針對以上最簡單接收後返回的功能進行了一個壓力測試。測試結果總體上來說還算不錯,但算不上非常出色;最終測結果在一臺E3 1230V2的PC機上測試結果是:10000連線,接近20萬rps。