1. 程式人生 > >asp.net core mvc剖析:KestrelServer

asp.net core mvc剖析:KestrelServer

KestrelServer是基於Libuv開發的高效能web伺服器,那我們現在就來看一下它是如何工作的。在上一篇文章中提到了Program的Main方法,在這個方法裡Build了一個WebHost,我們再來看一下程式碼:

public static void Main(string[] args)
   {
       var host = new WebHostBuilder()
           .UseKestrel()
           .UseContentRoot(Directory.GetCurrentDirectory())
           .UseIISIntegration()
           .UseStartup<Startup>()
           .Build();
 
       host.Run();
   }

  裡面有一個UseKestrel方法呼叫,這個方法的作用就是使用KestrelServer作為web server來提供web服務。在WebHost啟動的時候,呼叫了IServer的Start方法啟動服務,由於我們使用KestrelServer作為web server,自然這裡呼叫的就是KestrelServer.Start方法,那我們來看下KestrelServer的Start方法裡主要程式碼:

 首先,我們發現在Start方法裡建立了一個KestrelEngine物件,具體程式碼如下:

var engine = new KestrelEngine(new ServiceContext
{
       FrameFactory = context =>
       {
           return new Frame<TContext>(application, context);
       },
       AppLifetime = _applicationLifetime,
       Log = trace,
       ThreadPool = new LoggingThreadPool(trace),
       DateHeaderValueManager = dateHeaderValueManager,
       ServerOptions = Options
 });

  KestrelEngine構造方法接受一個ServiceContext物件引數,ServiceContext裡包含一個FrameFactory,從名稱上很好理解,就是Frame得工廠,Frame是什麼?Frame是http請求處理物件,每個請求過來後,都會交給一個Frame物件進行受理,我們這裡先記住它的作用,後面還會看到它是怎麼例項化的。除了這個外,還有一個是AppLiftTime,它是一個IApplicationLifetime物件,它是整個應用生命週期的管理物件,前面沒有說到,這裡補充上。

public interface IApplicationLifetime
    {
        /// <summary>
        /// Triggered when the application host has fully started and is about to wait
        /// for a graceful shutdown.
        /// </summary>
        CancellationToken ApplicationStarted { get; }

        /// <summary>
        /// Triggered when the application host is performing a graceful shutdown.
        /// Requests may still be in flight. Shutdown will block until this event completes.
        /// </summary>
        CancellationToken ApplicationStopping { get; }

        /// <summary>
        /// Triggered when the application host is performing a graceful shutdown.
        /// All requests should be complete at this point. Shutdown will block
        /// until this event completes.
        /// </summary>
        CancellationToken ApplicationStopped { get; }

        /// <summary>
        /// Requests termination the current application.
        /// </summary>
        void StopApplication();
    }

  IApplicationLifetime中提供了三個時間點,

  1,ApplicationStarted:應用程式已啟動
  2,ApplicationStopping:應用程式正在停止
  3,ApplicationStopped:應用程式已停止

  我們可以通過CancellationToken.Register方法註冊回撥方法,在上面說到的三個時間點,執行我們特定的業務邏輯。IApplicationLifetime是在WebHost的Start方法裡建立的,如果想在我們自己的應用程式獲取這個物件,我們可以直接通過依賴注入的方式獲取即可。

 我們繼續回到ServiceContext物件,這裡面還包含了Log物件,用於跟蹤日誌,一般我們是用來看程式執行的過程,並可以通過它發現程式執行出現問題的地方。還包含一個ServerOptions,它是一個KestrelServerOptions,裡面包含跟服務相關的配置引數:

1,ThreadCount:服務執行緒數,表示服務啟動後,要開啟多少個服務執行緒,因為每個請求都會使用一個執行緒來進行處理,多執行緒會提高吞吐量,但是並不一定執行緒數越多越好,在系統裡預設值是跟CPU核心數相等。

2,ShutdownTimeout:The amount of time after the server begins shutting down before connections will be forcefully closed(在應用程式開始停止到強制關閉當前請求連線所等待的時間,在這個時間段內,應用程式會等待請求處理完,如果還沒處理完,將強制關閉)

3,Limits:KestrelServerLimits物件,裡面包含了服務限制引數,比如MaxRequestBufferSize,MaxResponseBufferSize

其他引數就不再一個一個說明了。

KestrelEngine物件建立好後,通過呼叫 engine.Start(threadCount),根據配置的threadcount進行服務執行緒KestrelThread例項化,程式碼如下:
     public void Start(int count)
        {
            for (var index = 0; index < count; index++)
            {
                Threads.Add(new KestrelThread(this));
            }

            foreach (var thread in Threads)
            {
                thread.StartAsync().Wait();
            }
        }

 上面的程式碼會建立指定數量的Thread物件,然後開始等待任務處理。KestrelThread是對libuv執行緒處理的封裝。

這些工作都準備好後,就開始啟動監聽服務了

 foreach (var endPoint in listenOptions)
                {
                    try
                    {
                        _disposables.Push(engine.CreateServer(endPoint));
                    }
                    catch (AggregateException ex)
                    {
                        if ((ex.InnerException as UvException)?.StatusCode == Constants.EADDRINUSE)
                        {
                            throw new IOException($"Failed to bind to address {endPoint}: address already in use.", ex);
                        }

                        throw;
                    }

                    // If requested port was "0", replace with assigned dynamic port.
                    _serverAddresses.Addresses.Add(endPoint.ToString());
                }

  上面紅色字型程式碼,就是建立監聽服務的方法,我們再詳細看下里面的詳細情況:

      public IDisposable CreateServer(ListenOptions listenOptions)
        {
            var listeners = new List<IAsyncDisposable>();

            try
            {
//如果前面建立的執行緒數量為1,直接建立listener物件,啟動監聽 if (Threads.Count == 1) { var listener = new Listener(ServiceContext); listeners.Add(listener); listener.StartAsync(listenOptions, Threads[0]).Wait(); } else {
            //如果執行緒數不為1的時候 var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n"); var pipeMessage = Guid.NewGuid().ToByteArray();            //先建立一個主監聽物件,這個Listenerprimary就是一個Listener,監聽socket就是在這裡面建立的 var listenerPrimary = new ListenerPrimary(ServiceContext); listeners.Add(listenerPrimary);
            //啟動監聽 listenerPrimary.StartAsync(pipeName, pipeMessage, listenOptions, Threads[0]).Wait(); //為剩餘的每個服務執行緒關聯一個ListenerSecondary物件,這個物件使用命名Pipe與主監聽物件通訊,在主監聽物件接收到請求後,通過pipe把接受的socket物件傳送給特定的執行緒處理 foreach (var thread in Threads.Skip(1)) { var listenerSecondary = new ListenerSecondary(ServiceContext); listeners.Add(listenerSecondary); listenerSecondary.StartAsync(pipeName, pipeMessage, listenOptions, thread).Wait(); } } return new Disposable(() => { DisposeListeners(listeners); }); } catch { DisposeListeners(listeners); throw; } }

  這個時候服務就開始接受http請求了,我們前面說到了,監聽socket在listener類中建立(ListenerPrimary也是一個Listener),下面是listener的start方法

      public Task StartAsync(
            ListenOptions listenOptions,
            KestrelThread thread)
        {
            ListenOptions = listenOptions;
            Thread = thread;

            var tcs = new TaskCompletionSource<int>(this);

            Thread.Post(state =>
            {
                var tcs2 = (TaskCompletionSource<int>) state;
                try
                {
                    var listener = ((Listener) tcs2.Task.AsyncState);
//建立監聽socket listener.ListenSocket = listener.CreateListenSocket();
//開始監聽,當有連線請求過來後,觸發ConnectionCallback方法 ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this); tcs2.SetResult(0); } catch (Exception ex) { tcs2.SetException(ex); } }, tcs); return tcs.Task; }

  ConnectionCallback:當連線請求過來後被觸發,在回撥方法裡,進行連線處理分發,連線分發程式碼如下:

     protected virtual void DispatchConnection(UvStreamHandle socket)
        {
            var connection = new Connection(this, socket);
            connection.Start();
        }

  這個是listener類中的實現,我們前面看到,只有在執行緒數為1的情況下,才建立Listener物件進行監聽,否則建立ListenerPrimary監聽,ListenerPrimay裡重寫了方法,它的實現如下:

     protected override void DispatchConnection(UvStreamHandle socket)
        {
//這裡採用輪詢的方式,把連線請求依次分發給不同的執行緒進行處理 var index = _dispatchIndex++ % (_dispatchPipes.Count + 1); if (index == _dispatchPipes.Count) {
   // base.DispatchConnection(socket); } else { DetachFromIOCP(socket); var dispatchPipe = _dispatchPipes[index];
//這裡就是通過命名pipe,傳遞socket給特定的執行緒 var write = new UvWriteReq(Log); write.Init(Thread.Loop); write.Write2( dispatchPipe, _dummyMessage, socket, (write2, status, error, state) => { write2.Dispose(); ((UvStreamHandle)state).Dispose(); }, socket); } }

  好了,連線請求找到處理執行緒後,後面就可以開始處理工作了。ListenerSecondary裡的程式碼比較複雜,其實最終都會呼叫下面的程式碼完成Connection物件的建立

 var connection = new Connection(this, socket);
connection.Start();

  Connection表示的就是當前連線,下面是它的構造方法

public Connection(ListenerContext context, UvStreamHandle socket) : base(context)
        {
            _socket = socket;
            _connectionAdapters = context.ListenOptions.ConnectionAdapters;
            socket.Connection = this;
            ConnectionControl = this;

            ConnectionId = GenerateConnectionId(Interlocked.Increment(ref _lastConnectionId));

            if (ServerOptions.Limits.MaxRequestBufferSize.HasValue)
            {
                _bufferSizeControl = new BufferSizeControl(ServerOptions.Limits.MaxRequestBufferSize.Value, this);
            }
        //建立輸入輸出socket流
            Input = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
            Output = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);

            var tcpHandle = _socket as UvTcpHandle;
            if (tcpHandle != null)
            {
                RemoteEndPoint = tcpHandle.GetPeerIPEndPoint();
                LocalEndPoint = tcpHandle.GetSockIPEndPoint();
            }
        //建立處理frame,這裡的framefactory就是前面建立KestrelEngine時建立的工廠
            _frame = FrameFactory(this);
            _lastTimestamp = Thread.Loop.Now();
        }

  然後呼叫Connection的Start方法開始進行處理,這裡面直接把處理任務交給Frame處理,Start方法實現:

public void Start()
        {
            Reset();
       //啟動了非同步處理任務開始進行處理 _requestProcessingTask = Task.Factory.StartNew( (o) => ((Frame)o).RequestProcessingAsync(),//具體的處理方法 this, default(CancellationToken), TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); _frameStartedTcs.SetResult(null); }
RequestProcessingAsync方法裡不再詳細介紹了,把主要的程式碼拿出來看一下:
。。。。。
//_application就是上一篇文章提到的HostApplication,首先呼叫CreateContext建立HttpContext物件 var context = _application.CreateContext(this); 。。。。。。
//進入處理管道 await _application.ProcessRequestAsync(context).ConfigureAwait(false); 。。。。。。
ProcessRequestAsync完成處理後,把結果輸出給客戶端,好到此介紹完畢。如果有問題,歡迎大家指點。