1. 程式人生 > >【C#】C#執行緒_I/O限制的非同步操作

【C#】C#執行緒_I/O限制的非同步操作

目錄結構:

contents structure [+]
    1. 為什麼需要非同步IO操作
    2. C#的非同步函式
      1. async和await的使用
    3. 非同步函式的狀態機
      1. 非同步函式如何轉化為狀態機
      2. 如何擴充套件非同步函式
    4. FCL中的非同步IO操作
      1. FileStream類
    5. 非同步實現伺服器
    6. 如何取消非同步IO操作

在這篇文章中,筆者將會討論如何執行非同步的IO操作。上面一篇文章,筆者介紹瞭如何執行非同步計算操作。在讀完本文後,將非同步應用到IO操作中,能夠提供讀取的效率。

1.為什麼需要非同步IO操作

關於非同步操作,想必讀者已知道非同步IO操作,筆者在這裡展示FileStream類讀取本地檔案的過程。首先展示FileStream類同步IO操作的流程圖。

上面的執行流程中,在第4步Windows將IRP資料包傳送給恰當的裝置驅動的IRP佇列(每個裝置驅動程式都維護著自己的IRP佇列,其中包含了機器上執行的所有程序發出的I/O請求)。在IRP資料包到達時,裝置驅動程式將IRP資訊傳給物理硬體裝置上的安裝電路板,然後由硬體驅動裝置執行請求的I/O操作,也就是第5個步驟。

當硬體驅動裝置執行I/O操作期間,發出了I/O請求的執行緒將無事可做,所以Windows將執行緒變成睡眠執行緒,防止它浪費CPU的時間(步驟6)。這當然好,雖然執行緒不浪費時間,但其仍然浪費空間(記憶體),這當然就不好了。

當硬體裝置執行完I/0操作。然後Windows會喚醒你的執行緒,把它排程給一個CPU,使其從核心模式返回至使用者模式,然後返回至託管程式碼(步驟7、8、9)。

上面的步驟看起來很不錯,但是依舊存在兩個問題:1.請求的數量越來越多,建立的執行緒就越來越多,那麼被阻塞的執行緒就會越來越多,這樣會更浪費記憶體。2.用執行結果來響應請求,如果請求的數量非常多,那麼解鎖的阻塞執行緒也就很多,而且機器上的執行緒數都會遠遠大於CPU數,所以線上程阻塞期間很有可能會頻繁地發生上下文切換,損害效能。

下面展示Windows如何非同步執行I/O操作,仍然使用FileStream來構建物件,但是需要傳遞FileOptions.Asynchronous標誌,告訴Windows希望檔案的讀/寫以非同步的方式進行。


在使用FileOptions.Asynchronous建立FileStream物件後,就應該使用ReadAsync(...)來讀取檔案,而不是Read(...)。在ReadAsync內部分配一個Task<Int32>物件來代表用於完成的讀取操作的程式碼。然後ReadAsync呼叫Win32ReadFile函式(步驟1),ReadFile分配IRP資料包(步驟2),然後將其傳遞給Windows核心(步驟3)。Windows核心把IRP資料包新增到IRP佇列中(步驟4)。此時執行緒不會再阻塞,而是可以直接執行返回至你的程式碼。所以執行緒能夠立即從ReadAsync呼叫中返回(步驟5、6、7)。

在呼叫ReadAsync後返回一個Task<Int32>物件,可以在該物件上呼叫ContinueWith來登記任務完成時執行的回撥方法,然後在回撥方法中處理資料。當硬體裝置處理好IRP後(步驟a)。硬體裝置會把IRP放到CLR的執行緒池中佇列中(步驟b)。將來某個時候,一個執行緒池會提取完成的IRP並執行任務的程式碼,最終要麼設定異常(如果發生異常),要麼返回結果(步驟c)。在知道這些之後,就知道使用非同步I/O可以儘量的減少同步I/O訪問存在的這些問題。

2.C#的非同步函式

之前的一篇文章中,我們討論了《計算限制的非同步操作》,其中絕大部分程式碼都是使用Task來完成的。C#還為我們提供了另一種非同步糖語法—非同步函式,使用非同步函式時可以以順序的寫非同步程式碼,感覺像是在進行同步操作。Task和非同步函式的功能類似,那麼在需要進行非同步操作的時候,如何選擇如果涉及到計算操作,與I/0無關的話,就選擇Task,否則建議使用非同步函式。

2.1 async和await的使用

async和await是C#非同步函式程式設計的核心,被async標記的方法表明該方法應該以非同步的方式執行;await操作符用於標記非同步函式執行完成後狀態機恢復的位置(注意,這裡不是等待),同時指示包含該await操作符的方法以非同步的方式執行。如果async中不包含await,那麼async會以同步的方式執行。
下面展示非同步訪問網路的步驟:

        static void Main(string[] args)
        {
            Task<int> task= AccessTheWebAsync();
            task.ContinueWith((t) => {
                Console.WriteLine(t.Result);
            });
            Console.ReadLine();
        }
        static async Task<int> AccessTheWebAsync() {
            //需要引入System.Net.Http程式集
            HttpClient httpClient = new HttpClient();

            Task<string> getStringTask = httpClient.GetStringAsync("https://www.baidu.com/");

            // await操作符掛起AccessTheWebAsync方法
            //   AccessTheWebAsync不能夠繼續執行,直到getStringTask任務完成。
            //   AccessTheWebAsync可以從這裡直接非同步返回給AccessTheWebAsync的呼叫者。
            //   當getStringTask任務完成後,狀態機可以直接從這裡恢復,並且await操作符會返回任務的Result值。
            String urlContents = await getStringTask;

            //返回長度
            return urlContents.Length;
        }

使用async和await有以下幾點需要注意:
1.方法名應該以Async結尾(比如:AccessTheWebAsync)。
2.方法應該包含有async修飾符。
3.方法的返回型別應該是Task<TResult>或Task或void或其他型別(從C#7.0,.NET Core開始,其他型別的返回值應該提供GetAwaiter方法)。
4.方法中至少應該包含一個await表示式。

3.非同步函式的狀態機

3.1 非同步函式如何轉化為狀態機

通常情況下,觀察編譯器給我們編譯好的程式碼,可以幫助我們更好的理解我們的程式碼。像async和await操作符,編譯器其實是把這些操作符轉化成了一種狀態機的機制。將含有async和await的程式碼編譯為IL程式碼,再將IL程式碼反編譯為C#程式碼,就可以得到狀態機。
比如:

    class Type { }

    class Program
    {

        private static async Task<Type> Method1()
        {
            /*執行一些非同步操作,最後返回一個Type型別的資料*/
            HttpClient httpClient = new HttpClient();
            String result= await httpClient.GetStringAsync("http://www.baidu.com");

            return new Type();
        }

        private static async Task<String> MyMethodAsync() {

            Type result1 = await Method1();

            return result1.ToString();
        }

        static void Main(string[] args)
        {
        }
    }

編譯為IL程式碼後,再利用ILSPY把IL程式碼反編譯為C#程式碼,在返編譯IL程式碼的時候,需要注意,不能勾選“decompile async methods(async/await)”

然後就可以看到async和await轉化成的狀態機


通過檢視反編譯後的C#程式碼,C#中的非同步函式的執行過程,可以用下圖進行簡單的概括:

但任務未完成時,isCompleted返回false,所以會在onCompleted登記任務完成時會呼叫的action動作,action動作執行完成後,會再一次呼叫MoveNext,然後isCompleted就返回true,此時就可以通過GetResult獲得結果。

3.2 如何擴充套件非同步函式

在擴充套件性方面,能用Task物件包裝一個即將完成的操作,就可以使用await操作符來等待該操作。

下面是一個TaskLogger類,可用它顯示未完成的非同步操作。

    static class TaskLogger {
        public enum TaskLogLevel { None,Pending}
        public static TaskLogLevel LogLevel { get; set; }

        public sealed class TaskLogEntry {
            public Task Task { get; internal set; }
            public String Tag { get; internal set; }
            public DateTime LogTime { get; internal set; }
            public String CallerMemberName { get; internal set; }
            public String CallerFilePath { get; internal set; }
            public Int32 CallerLineNumber { get; internal set; }
            public override string ToString()
            {
                return String.Format("LogTime={0},Tag={1},Member={2},File={3}({4})",
                    LogTime,Tag??"(none)",CallerMemberName,CallerFilePath,CallerLineNumber);
            }
        }

        private static readonly ConcurrentDictionary<Task, TaskLogEntry> s_log = new ConcurrentDictionary<Task, TaskLogEntry>();

        public static IEnumerable<TaskLogEntry> GetLogEntries() { return s_log.Values;}

        public static Task<TResult> Log<TResult>(this Task<TResult> task, String tag = null,
            [CallerMemberName] String callerMemberName=null,
            [CallerFilePath] String callerFilePath=null,
            [CallerLineNumber] Int32 callerLineNumber=-1) {

                return (Task<TResult>)Log(task, tag, callerMemberName, callerFilePath, callerLineNumber);
        }

        public static Task Log(this Task task, String tag = null,
           [CallerMemberName] String callerMemberName = null,
           [CallerFilePath] String callerFilePath = null,
           [CallerLineNumber] Int32 callerLineNumber = -1) {
               if (LogLevel == TaskLogLevel.None) {
                   return task;
               }
               var logEntry = new TaskLogEntry {
                   Task=task,
                   LogTime=DateTime.Now,
                   Tag=tag,
                   CallerMemberName=callerMemberName,
                   CallerFilePath=callerFilePath,
                   CallerLineNumber=callerLineNumber
            };

               s_log[task] = logEntry;

               //附加一個非同步任務,當一個任務執行完成後,應該將其從清單中移除
               task.ContinueWith(t => {
                   TaskLogEntry entry;
                   s_log.TryRemove(t,out entry);
               },TaskContinuationOptions.ExecuteSynchronously);

               return task;
        }
    }

Callation類,用於取消正在執行的非同步操作

    static class Cancellation
    {
        public struct Void { }
        
        public static async Task WithCancellation(this Task originalTask, CancellationToken ct)
        {
            //建立在Cancellation被取消時完成的一個Task
            var cancelTask = new TaskCompletionSource<Void>();

            using (ct.Register(t => ((TaskCompletionSource<Void>)t).TrySetResult(new Void()), cancelTask)) {
                
                //建立在原始Task或CancellationToken Task完成時都完成的一個Task
                Task any = await Task.WhenAny(originalTask,cancelTask.Task);

                //任務Task因為CancellationToken而完成,就丟擲OperationCanceledException
                if (any == cancelTask.Task)
                    ct.ThrowIfCancellationRequested();
            };

            //等待原始任務;若任務失敗,它將丟擲一個異常
            await originalTask;
        }
    }

最後,展示如何使用

    class Program
    {
        static void Main(string[] args)
        {
            Go();
            Console.ReadLine();
        }
        public static async Task Go() {
#if DEBUG
            //使用TaskLogger會影響記憶體和效能,所以只在除錯生成中啟用它
            TaskLogger.LogLevel=TaskLogger.TaskLogLevel.Pending;
#endif
            //初始化3個任務;為了測試TaskLogger,我們顯示控制持續時間
            var tasks = new List<Task>{
                Task.Delay(2000).Log("2s op"),
                Task.Delay(5000).Log("5s op"),
                Task<String>.Delay(8000).Log("8s op"),
            };

            try
            {
                //等待全部任務,但在3秒後取消;只有一個任務能夠按時完成
                await Task.WhenAll(tasks).WithCancellation(new CancellationTokenSource(3000).Token);
            }
            catch (OperationCanceledException) {
                //查詢logger哪些任務尚未完成,按照從等待時間從最長到最短的順序排序
                foreach (var op in TaskLogger.GetLogEntries().OrderBy(tle => tle.LogTime)) {
                    Console.WriteLine(op);
                }
            }
        }
    }

我的得到如下的輸出結果:

LogTime=2018/11/7 1:30:41,Tag=8s op,Member=Go,File=e:\MyLearn\ConsoleApplication1\Program.cs(28)
LogTime=2018/11/7 1:30:41,Tag=5s op,Member=Go,File=e:\MyLearn\ConsoleApplication1\Program.cs(27)


除了增強使用Task的靈活性,非同步函式對另一個擴充套件性有力的地方在於編譯器可以在await的任何運算元上呼叫GetAwaiter。所以運算元不一定是Task物件。可以是任何任意型別,只要提供一個呼叫GetAwaiter的方法就可以了。
例如:

    public sealed class EventAwaiter<TEventArgs> : INotifyCompletion {
        private ConcurrentQueue<TEventArgs> m_events = new ConcurrentQueue<TEventArgs>();

        private Action m_continuation;

        //狀態機呼叫GetAwaiter獲得Awaiter,這裡返回自己
        public EventAwaiter<TEventArgs> GetAwaiter() { return this; }

        //告訴狀態機是否發生了任何事件
        public Boolean IsCompleted { get { return m_events.Count > 0; } }

        //狀態機告訴我們以後要呼叫什麼方法,continuation中包含有恢復狀態機的操作
        public void OnCompleted(Action continuation) {
            Volatile.Write(ref m_continuation,continuation);
        }

        //狀態機查詢結果,這是awaiter操作符的結果
        public TEventArgs GetResult() {
            TEventArgs e;
            m_events.TryDequeue(out e);
            return e;
        }

        public void EventRaised(Object sender, TEventArgs eventArgs) {
            m_events.Enqueue(eventArgs);

            //如果有一個等待執行的延續任務,該執行緒會執行它
            Action continuation = Interlocked.Exchange(ref m_continuation, null);
            if (continuation != null) {
                continuation();//恢復狀態機
            }
        }
    }

在EventAwaiter類在事件發生的時候從await操作符返回。在本例中,一旦AppDomain中的任何執行緒丟擲異常,狀態機就會繼續。

        private static async void ShowException() {
            var eventAwaiter = new EventAwaiter<FirstChanceExceptionEventArgs>();
            AppDomain.CurrentDomain.FirstChanceException += eventAwaiter.EventRaised;

            while (true) {
                Console.WriteLine((await eventAwaiter).Exception.GetType());
            }
        }

最後的程式碼演示了這一切是如何工作的
 

       static void Main(string[] args)
        {
            ShowException();

            for (int i = 0; i < 3; i++) {
                try
                {
                    switch (i) {
                        case 0: throw new InvalidCastException();
                        case 1: throw new InvalidOperationException();
                        case 2: throw new ArgumentException();
                    }
                }
                catch (Exception) {
                }
            }
            Console.ReadLine();
        }

4.FCL中的非同步IO操作

FCL中的非同步函式非常容易辨認,因為命名規範要求非同步函式必須加上Async的字尾。在FCL中,支援I/O操作的許多型別都提供了XxxAsync方法
例如:
a.System.IO.Stream的所有派生類都提供了ReadAsync,WriteAsync,FlushAsync和CopyToAsync方法
b.System.IO.TextReader的所有派生類都提供了ReadAsync,ReadLineAsync,ReadToEndAsync和ReadBlockAsync方法。System.IO.TextWriter的派生類提供了WriteAsync,WriteLineAsync和FlushAsync.
c.System.Net.Http.HttpClient 類提供了GetAsync,GetStreamAsync,GetByteArrayAsync,PostAsync,PutAsync,DeleteAsync和其他許多方法。
d.System.Net.WebRequest的所派生類(包括FileWebRequest,FtpWebRequest和HttpWebRequest)都提供了GetRequestStreamAsync和GetResponseAsync方法。
e.System.Data.SqlClient.SqlCommand類提供了ExecuteDbDataReaderAsync,ExecuteNonQueryAsync,ExecuteReaderAsync,ExecuteScalarAsync和ExecuteXmlReaderAsync方法。
f.生成Web服務代理工具(比如SvcUtil.exe)也生成了XxxAsync方法。

這裡筆者以System.Net.Http.HttpClient來舉例:

        static async void Go() {
            HttpClient httpClient = new HttpClient();
            Stream stm = await httpClient.GetStreamAsync("http://www.baidu.com");

            StreamReader sr = new StreamReader(stm);
            String line= "";
            while ((line = await sr.ReadLineAsync()) != null) {
                Console.WriteLine(line);
            }
        }


FCL中有許多程式設計都使用了BeginXxx/EndXxx方法模型和IAsyncResult介面,還有基於事件的程式設計模型,它也提供了XxxAsync方法(不返回Task物件),能在非同步操作完成時呼叫事件處理程式。這兩種程式設計模型都已經過時,使用Task的新模型才是你的首要選擇。

在FCL中,有一些類缺少XxxAsync方法,只提供了BeginXxx和EndXxx方法。可以通過TaskFactory將其轉化為基於Task的模型。
BeginExecuteXXX 和EndExecuteXXX 使用TaskFactory來轉化的步驟,例如:
返回值= Task.Factory.FromAsync(BeginEexcuteXXX,EndExecuteXXX,...);
返回值是EndExecute的返回值。

例如:NamedPipeServerStream類定義了BeginWaitForConnection和EndWaitForConnection,但是沒有定義WaitForConnectionAsync方法,可以按照如下程式碼來完成轉化。

        static async void StartServer() {
            while (true) { //迴圈不停的接受來自客戶端的連結
                var pipe = new NamedPipeServerStream(c_pipeName,PipeDirection.InOut,-1,PipeTransmissionMode.Message,PipeOptions.Asynchronous|PipeOptions.WriteThrough);

                //非同步的接受來自客戶端的連線
                //用TaskFactory的FromAsync將舊的非同步程式設計模型轉化為新的Task模型
                //當沒有客戶端連線時,執行緒將會掛起,並且允許方法已非同步的方式返回呼叫者(本例中未有返回)
                //當有客戶端連線後,立即喚醒狀態機,執行緒繼續執行。
                await Task.Factory.FromAsync(pipe.BeginWaitForConnection,pipe.EndWaitForConnection,null);

                //為客戶端提供服務
                //startServiceConnectionAsync 也是非同步方法,所以能夠立即返回
                startServiceConnectionAsync(pipe);
            }
        }

FCL沒有提供任何的輔助方法將舊的、基於事件的程式設計模型轉化為新的、基於Task的程式設計模型。所有隻能使用硬編碼的方式。例如下面演示了使用TaskCompletionSource包裝使用了“基於事件的程式設計模型”的WebClient,以便在非同步函式中等待它。

        static async Task<String> AwaitWebClient(Uri uri) {
            //System.Net.WebClient
            var wc = new System.Net.WebClient();
            
            //建立TaskCompletionSource及其基礎Task物件
            var tcs = new TaskCompletionSource<String>();

            //字串下載完成後,WebClient物件引發DownloadStringCompleted事件
            wc.DownloadStringCompleted += (s, e) => {
                if (e.Cancelled) tcs.SetCanceled();
                else if (e.Error != null) tcs.SetException(e.Error);
                else tcs.SetResult(e.Result);
            };

            //啟動非同步操作
            wc.DownloadStringAsync(uri);

            //現在可以等待TaskCompletion
            String result = await tcs.Task;

            return result;
        }

4.1 FileStream類

建立FileStream物件時,可通過FileOptions.AsyncChronous標誌指定以同步方式還是非同步方式進行通訊。如果不指定該標誌,Windows將以同步方式執行所有檔案操作。當然,仍然可以呼叫FileStream的ReadAsync方法,對於你的應用程式,表面上是非同步執行,但FileStream類在內部用另一個執行緒模擬非同步行為。這個額外的執行緒純屬是浪費。

如果建立FileStream物件時指定FileOptions.AsyncChronous標誌。然後,可以呼叫FileStream的Read方法執行一個同步操作。在內部,FileStream類會開始一個非同步操作,然後立即呼叫執行緒進入睡眠狀態,直到操作完成才喚醒,從而模擬同步行為,這樣依然效率低下。

總之,使用FileStream時應該想好是以同步方式還是以非同步方式執行I/O操作,並指定FileOptions.Asynchronous標誌來指明自己的選擇。如果指定了該標誌,就總是呼叫ReadAsync。如果沒有使用這個標誌,就總是呼叫Read。這樣能夠獲得最佳效能。如果想先對FileStream執行一些同步操作,再執行一些非同步操作,那麼更高效的做法是使用FileOptions.Asynchronous標誌來構造它。另外也可針對同一個檔案,建立兩個FileStream物件,一個FileStream進行同步操作,另一個FileStream執行非同步操作。

FileStream的輔助方法(Create,Open和OpenWrite)建立並返回FileStream物件,這些方法都沒有指定FileOptions.Asynchronous標誌,所以為了實現響應靈敏的、可伸縮性的應用程式,應避免使用這些方法。

5.非同步實現伺服器

FCL內建了對伸縮性很好的一些非同步伺服器的支援。下面列舉中MSDN文件中值的參考的地方。
1.要構建非同步ASP.NET Web窗體,在.aspx檔案中新增Async="true"的網頁指令,並參考System.Web.UI.Page的RegisterAsyncTask方法。
2.要構建非同步ASP.NET MVC控制器,使你的控制器類從System.Web.Mvc.AsyncController派生,讓操作方法返回一個Task<ActionResult>即可。
3.要構建非同步ASP.NET 處理程式,使你的類從System.Web.HttpTaskAsyncHandler派生,重寫其ProcessRequestAsync方法。
4.要構建非同步WCF服務,將服務作為非同步函式來實現,讓它返回Task或Task<TResult>。

6.如何取消非同步IO操作

Windows一般沒有提供取消未完成I/O操作的途徑,這是許多開發人員都想要的功能,實現起來卻很困難。畢竟,如果向伺服器請求了1000個位元組,然後決定不再需要這些位元組,那麼其實沒有辦法高數伺服器忘掉你的請求。在這種情況下,只能讓位元組照常返回,再將他們丟棄。此外,這裡還發生競態條件-取消請求的請求可能正在伺服器傳送響應的時候到來,要在程式碼中處理這種潛在的競態條件,決定是丟棄還是使用資料。

建議實現一個WithCancellation擴充套件方法Task<TResult>(需要過載版本來擴充套件Task)上面的案例中,我們已經使用過Task的擴充套件版本了,下面是Task<TResult>版本:

static class CancelleationClass {
private struct Void { }//沒有泛型的TaskCompletionSource類

public static async Task<TResult> WithCancellation<TResult>(this Task<TResult> originalTask, CancellationToken ct)
{
    //建立在CancellationToken被取消時完成的一個Task
    var cancelTask = new TaskCompletionSource<Void>();

    //一旦CancellationToken被取消,就完成Task
    CancellationTokenRegistration cancellationTokenRegistration = ct.Register(t =>
    {
        ((TaskCompletionSource<Void>)t).TrySetResult(new Void());
    }, cancelTask);

    //建立在原始task或cancel task完成時都完成的Task
    Task any = await Task.WhenAny(originalTask, cancelTask.Task);

    //只要是cancel task先完成,就丟擲OperationCanceledException
    if (any == cancelTask.Task)
    {
        ct.ThrowIfCancellationRequested();
    }

    //釋放資源
    cancellationTokenRegistration.Dispose();

    //返回原始任務
    return originalTask.Result;
}
}

按照如下的程式碼來使用它:

public static async Task<Int32> go() {
    var cts = new CancellationTokenSource();
    var ct = cts.Token;

    try
    {
        Int32 max = 10;
        Task<Int32> task = new Task<Int32>(() => {
            Int32 result = 0;
            for (int i = 0; i < max; i++) {
                result += i;
                Thread.Sleep(1000);
            }
            return result;
        });
        task.Start();

        //在指定的時間後取消操作
        Task.Delay(500).ContinueWith((obj) =>
        {
            cts.Cancel();
        });

        Int32 res=await task.WithCancellation<Int32>(ct);

        return res;
    }
    catch (OperationCanceledException e) {
        Console.WriteLine(e.Message);
    }
    return -1;
}