corefx 原始碼學習:NetworkStream.ReadAsync 是如何從 Socket 非同步讀取資料的
最近遇到 NetworkStream.ReadAsync 在 Linux 上高併發讀取資料的問題,由此激發了閱讀 corefx 中 System.Net.Sockets 實現原始碼(基於corefx 2.2 )的興趣。
這篇隨筆是閱讀 NetworkStream.ReadAsync 相關原始碼的簡單筆記,基於在 Linux 上執行的場景。
NetworkStream 繼承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法簽名是
public Task<int> ReadAsync(byte[] buffer, int offset, int count);
實際呼叫的是
public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
上面的的方法被 NetworkStream 重寫(override),呼叫的是 Socket 的 ReceiveAsync 方法
return _streamSocket.ReceiveAsync( new Memory<byte>(buffer, offset, size), SocketFlags.None, fromNetworkStream: true, cancellationToken).AsTask();
Socket.ReceiveAsync 的方法簽名
internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)
主要實現程式碼
AwaitableSocketAsyncEventArgs saea = LazyInitializer.EnsureInitialized(ref LazyInitializer.EnsureInitialized(ref _cachedTaskEventArgs).ValueTaskReceive); if (saea.Reserve()) { saea.SetBuffer(buffer); saea.SocketFlags = socketFlags; saea.WrapExceptionsInIOExceptions = fromNetworkStream; var result = saea.ReceiveAsync(this); return result; } else { // We couldn't get a cached instance, due to a concurrent receive operation on the socket. // Fall back to wrapping APM. return new ValueTask<int>(ReceiveAsyncApm(buffer, socketFlags)); }
通常情況下都會使用 AwaitableSocketAsyncEventArgs 非同步讀取資料,所以我們這裡只從 saea.ReceiveAsync 往下看。
saea.ReceiveAsync 呼叫的是Socket.ReceiveAsync(SocketAsyncEventArgs e) 方法,而後者呼叫的是 SocketAsyncEventArgs.DoOperationReceive(SafeCloseSocket handle) 。
在 Linux 上 DoOperationReceive 的實現在 SocketAsyncEventArgs.Unix.cs 中,主要程式碼如下
internal unsafe SocketError DoOperationReceive(SafeCloseSocket handle) { //... if (_bufferList == null) { errorCode = handle.AsyncContext.ReceiveAsync(_buffer.Slice(_offset, _count), _socketFlags, out bytesReceived, out flags, TransferCompletionCallback); } else { errorCode = handle.AsyncContext.ReceiveAsync(_bufferListInternal, _socketFlags, out bytesReceived, out flags, TransferCompletionCallback); } if (errorCode != SocketError.IOPending) { CompleteTransferOperation(bytesReceived, null, 0, flags, errorCode); FinishOperationSync(errorCode, bytesReceived, flags); } return errorCode; }
handle.AsyncContext.ReceiveAsync 對應的 Linux 實現在 SocketAsyncContext.Unix.cs 中,呼叫的是 SocketAsyncContext 的 ReceiveFrom 方法,ReceiveFrom 的主要實現程式碼如下
public SocketError ReceiveFromAsync(Memory<byte> buffer,SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[], int, SocketFlags, SocketError> callback) { SetNonBlocking(); SocketError errorCode; int observedSequenceNumber; if (_receiveQueue.IsReady(this, out observedSequenceNumber) && SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode)) { return errorCode; } BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation(); operation.Callback = callback; operation.Buffer = buffer; operation.Flags = flags; operation.SocketAddress = socketAddress; operation.SocketAddressLen = socketAddressLen; if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber)) { receivedFlags = operation.ReceivedFlags; bytesReceived = operation.BytesTransferred; errorCode = operation.ErrorCode; ReturnOperation(operation); return errorCode; } bytesReceived = 0; receivedFlags = SocketFlags.None; return SocketError.IOPending; }
SocketPal.TryCompleteReceiveFrom 的實現程式碼在 SocketPal.Unix.cs 中,所呼叫的另一個 TryCompleteReceiveFrom 方法的簽名是
public static unsafe bool TryCompleteReceiveFrom(SafeCloseSocket socket, Span<byte> buffer, IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, out SocketError errorCode)
該方法呼叫的是 Receive 方法
private static unsafe int Receive(SafeCloseSocket socket, SocketFlags flags, IList<ArraySegment<byte>> buffers, byte[] socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)
在 Receive 方法中呼叫了
errno = Interop.Sys.ReceiveMessage( socket.DangerousGetHandle(), &messageHeader, flags, &received);
Interop.Sys.ReceiveMessage 對應的是 Linux 本地庫中的方法
internal static partial class Sys { [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_ReceiveMessage")] internal static extern unsafe Error ReceiveMessage(IntPtr socket, MessageHeader* messageHeader, SocketFlags flags, long* received); }
Libraries.SystemNative 對應的是哪個庫呢?
它就是 System.Native.so
$ find /usr/share/dotnet/ -name System.Native.so /usr/share/dotnet/shared/Microsoft.NETCore.App/2.2.0/System.Native.so
接下來根據 SocketError.IOPending 的情況閱讀原始碼。
SocketAsyncEventArgs 在 DoOperationReceive 方法中呼叫 SocketAsyncContext.ReceiveFrom 方法時(handle.AsyncContext.ReceiveAsync)傳遞了 TransferCompletionCallback 引數值,在非同步操作時是通過這個 callback 讀取 socket 資料的,對應的方法是 TransferCompletionCallbackCore 。
private void TransferCompletionCallbackCore(int bytesTransferred, byte[] socketAddress, int socketAddressSize, SocketFlags receivedFlags, SocketError socketError) { CompleteTransferOperation(bytesTransferred, socketAddress, socketAddressSize, receivedFlags, socketError); CompletionCallback(bytesTransferred, receivedFlags, socketError); }
TransferCompletionCallbackCore 中進一步呼叫 CompletionCallback
private void CompletionCallback(int bytesTransferred, SocketFlags flags, SocketError socketError) { if (socketError == SocketError.Success) { FinishOperationAsyncSuccess(bytesTransferred, flags); } else { if (_currentSocket.CleanedUp) { socketError = SocketError.OperationAborted; } FinishOperationAsyncFailure(socketError, bytesTransferred, flags); } }
在 CompletionCallback 中當 SocketError.Success 時進一步呼叫 FinishOperationAsyncSuccess
internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flags) { FinishOperationSyncSuccess(bytesTransferred, flags); // Raise completion event. if (_context == null) { OnCompleted(this); } else { ExecutionContext.Run(_context, s_executionCallback, this); } }
從上面的程式碼可以看出實際呼叫的也是 FinishOperationSyncSuccess ,非同步與同步讀取資料最終呼叫的是同一個方法。