使用高效能Pipelines構建.NET通訊程式
.NET Standard支援一組新的API,System.Span
Pipelines旨在解決.NET編寫Socket通訊程式時的很多困難 ,相信讀者也對此不勝其煩,使用stream模型進行程式設計,就算能夠解決,也是實在麻煩。
System.IO.Pipelines使用簡單的記憶體片段來管理資料,可以極大的簡化編寫程式的過程。關於Pipelines的詳細介紹,可以看看這裡。現在ASP.NET Core中使用的Kestrel已經在使用這個API。(話說這個東西貌似就是Kestrel團隊搞出來的。)
可能是直接需要用Socket場景有限(物聯網用的還挺多的),Pipelines相關的資料感覺不是很多。官方給出的示例是基於ASCII協議的,有固定結尾的協議,這裡我以物聯網裝置常用的BINARY二進位制自定義協議為例,講解基於Pipelines的程式套路。
System.IO.Pipelines
與基於Stream的方式不同,pipelines提供一個pipe,用於儲存資料,pipe中間儲存的資料有點連結串列的感覺,可以基於SequencePosition
進行slice操作,這樣就能得到一個ReadOnlySequence<T>
物件。reader可以進行自定義操作,並在操作完成之後告訴pipe已經處理了多少資料,整個過程是不需要進行記憶體複製操作的,因此效能得到了提升,還少了很多麻煩。可以簡單理解作為伺服器端,流程:
接受資料迴圈:接到資料->放pipe裡面->告訴pipe放了多少資料
處理資料迴圈:在pipe裡面找一條完整資料->交給處理流程->告訴pipe處理了多少資料
協議
有一款裝置,binary協議,資料包開頭0x75, 0xbd, 0x7e, 0x97一共4個位元組,隨後跟資料包長度2個位元組(固定2400位元組,不固定長度也可以參照),隨後是資料區。在裝置連線成功之後,資料主動從裝置傳送到PC。
關鍵程式碼
雖然是.NET Core平臺的,但是.NET FRAMEWORK 4.6.1上面也可以nuget安裝,直接
install-package system.io.pipelines
進行安裝就可以了。Socket相關處理的程式碼不再寫了,只列關鍵的。
程式碼第一步是宣告pipe。
private async void InitPipe(Socket socket) { Pipe pipe = new Pipe(); Task writing = FillPipeAsync(socket, pipe.Writer); Task reading = ReadPipeAsync(socket, pipe.Reader); await Task.WhenAll(reading, writing); }
pipe有reader還有一個writer,reader負責讀取pipe資料,主要用在資料處理迴圈,writer負責將資料寫入pipe,主要用在資料接受迴圈。
//寫入迴圈 private async Task FillPipeAsync(Socket socket, PipeWriter writer) { //資料流量比較大,用1M位元組作為buffer const int minimumBufferSize = 1024 * 1024; while (running) { try { //從writer中,獲得一段不少於指定大小的記憶體空間 Memory<byte> memory = writer.GetMemory(minimumBufferSize); //將記憶體空間變成ArraySegment,提供給socket使用 if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment)) { throw new InvalidOperationException("Buffer backed by array was expected"); } //接受資料 int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None); if (bytesRead == 0) { break; } //一次接受完畢,資料已經在pipe中,告訴pipe已經給它寫了多少資料。 writer.Advance(bytesRead); } catch { break; } // 提示reader可以進行讀取資料,reader可以繼續執行readAsync()方法 FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) { break; } } // 告訴pipe完事了 writer.Complete(); } //讀取迴圈 private async Task ReadPipeAsync(Socket socket, PipeReader reader) { while (running) { //等待writer寫資料 ReadResult result = await reader.ReadAsync(); //獲得記憶體區域 ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition? position = null; do { //尋找head的第一個位元組所在的位置 position = buffer.PositionOf((byte)0x75); if (position != null) { //由於是連續四個位元組作為head,需要進行比對,我這裡直接使用了ToArray方法,還是有了記憶體拷貝動作,不是很理想,但是寫起來很方便。 //對效能有更高要求的場景,可以進行slice操作後的單獨比對,這樣不需要記憶體拷貝動作 var headtoCheck = buffer.Slice(position.Value, 4).ToArray(); //SequenceEqual需要引用System.Linq if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 })) { //到這裡,認為找到包開頭了(從position.value開始),接下來需要從開頭處擷取整包的長度,需要先判斷長度是否足夠 if (buffer.Slice(position.Value).Length >= 2400) { //長度足夠,那麼取出ReadOnlySequence,進行操作 var mes = buffer.Slice(position.Value, 2400); //這裡是資料處理的函式,可以參考官方文件對ReadOnlySequence進行操作,文件裡面使用了span,那樣效能會好一些。我這裡簡單實用ToArray()操作,這樣也有了記憶體拷貝的問題,但是處理的直接是byte陣列了。 await ProcessMessage(mes.ToArray()); //這一段就算是完成了,從開頭位置,一整個包的長度就算完成了 var next = buffer.GetPosition(2400, position.Value); //將buffer處理過的捨棄,替換為剩餘的buffer引用 buffer = buffer.Slice(next); } else { //長度不夠,說明資料包不完整,等下一波資料進來再拼接,跳出迴圈。 break; } } else { //第一個是0x75但是後面不匹配,可能有資料傳輸問題,那麼需要捨棄第一個,0x75後面的位元組開始再重新找0x75 var next = buffer.GetPosition(1, position.Value); buffer = buffer.Slice(next); } } } while (position != null); //資料處理完畢,告訴pipe還剩下多少資料沒有處理(資料包不完整的資料,找不到head) reader.AdvanceTo(buffer.Start, buffer.End); if (result.IsCompleted) { break; } } reader.Complete(); }
以上程式碼基本解決了以下問題:
- 資料接收不完整,找不到開頭結尾,導致資料大量丟棄,或者自己維護一個queue的程式碼複雜性
- 資料接收與處理的同步問題
- 一次性收到多條資料的情況
後記
本文只是解釋了pipeline處理的模式,對於茫茫多的ToArray方法,可以使用基於Span的操作進行優化(有時間就來填坑)。另外,如果在await ProcessMessage(mes.ToArray());
這裡,直接使用Task.Run(()=>ProcessMessage(mes);
代替的話,實測會出現莫名其妙的問題,很有可能是pipe執行快,在系統排程Task之前,已經將記憶體釋放導致的,如果需要優化這一塊的話,需要格外注意。