1. 程式人生 > >自己動手寫個非同步IO函式 --(基於 c# Task)

自己動手寫個非同步IO函式 --(基於 c# Task)

前言    對於服務端,達到高效能、高擴充套件離不開非同步。對於客戶端,函式執行時間是1毫秒還是100毫秒差別不大,沒必要為這一點點時間煞費苦心。對於非同步,好多人還有誤解,如: 非同步就是多執行緒;非同步就是如何利用好執行緒池。非同步不是這麼簡單,否則微軟沒必要在非同步上花費這麼多心思。本文就介紹非同步最新的實現方式:Task,並自己動手寫一個非同步IO函式。只有瞭解了非同步函式內部實現方式,才能更好的利用它。

  對於c#,非同步處理經過了多個階段,但是對於現階段非同步就是Task,微軟用Task來抽象非同步操作。以後的非同步函式,處理的都是Task。你會看到處處都是task的身影。為了處理Task,c#引入了兩個關鍵詞async,await。這兩個關鍵詞也可以說是一個關鍵詞,因為async的存在是為了表明await是關鍵詞。總而言之:兩個關鍵詞幹了一件事,async關鍵詞並不改變函式的宣告。

  有人說await就是語法糖,不值得大書特書,我只能說你錯了。軟體開發堅持的原則為:程式碼要省,程式碼要清晰易懂!如果沒有語法糖,程式碼的維護性大大降低。await這個語法糖做的事很多;如果不用await,處理同樣的邏輯,需要多寫很多程式碼,並導致邏輯不清晰。

Task的分類

   非同步分為兩類 compute-base 和 IO-base。compute-base就是計算密集型,函式所有的操作都是在記憶體中,不涉及IO;如果執行這個函式,則單個執行緒利用率達100%;IO-base就是涉及到IO,IO包括檔案讀寫,socket讀寫;這類非同步操作底層涉及到IOCP(完成埠)。相應的,Task也分為兩類。

  對於這兩個區別可以舉個例子來區分:一臺電腦為4個執行緒。如果同時有4個compute-base執行緒執行,cpu的利用率為100%。如果同時有4個 IO-base的非同步操作,cpu利用率可能遠遠低於100%。

  對於.net 庫,有些函式會有兩個版本:一個是同步操作,一個是非同步操作(函式名以Async結尾,返回值為Task)。舉個例子:

     

    這是WebClient類獲取網址內容函式。你會問DownloadStringTaskAsync是compute-base  Task,還是 IO-base Task?我可以肯定的告訴你:只要是.net基本類庫提供的非同步函式基本都是IO-base Task(微軟官方文件是這樣要求)。其實這樣要求是有道理的:對於compute-base非同步,比較容易封裝;再者,這樣的非同步是不能大規模的併發的。如果16個執行緒cpu,同時併發16個這樣的非同步操作就是上限了;如果再多,反而會有害!

  有人說,如果基本類庫不提供 IO-base Task函式,我也可以封裝一下,這個也不難啊!程式碼如下:

//把一個同步操作,改造成非同步
public static async Task<byte[]> DownloadDataAsync(string url)
{
            WebRequest request = WebRequest.Create(url);

            return await Task.Run(() =>
            {
                using (var response = request.GetResponse())
                using (var responseStream = response.GetResponseStream())
                using (var result = new MemoryStream())
                {
                    responseStream.CopyTo(result);
                    return result.ToArray();
                }
            });
 }

  上面函式如果說是非同步操作,也不錯。但是,這不是“好”的非同步操作!這是非同步操作中夾雜著同步IO。會導致執行緒等待。如果有100個這樣的非同步操作,就需要100個執行緒,這些執行緒大部分並沒在幹活,而是在等待! 對於“好”的非同步IO,如果同時有100個操作,甚至幾萬個操作,使用的執行緒都是有限的,一般不超過cpu執行緒數。這是怎麼實現的?這涉及到IOCP,說起來有些複雜,可以參考IOCP相關資料。類庫提供非同步IO操作,都是涉及到IOCP的。所以得到如下結論: 如果類庫不提供IO非同步函式,無論怎麼改造,不可能改造成“好”的非同步函式!

Task實現的基本原理

  Task變數狀態如下

  狀態簡要分為生成、執行、執行完畢這三個階段。如果執行完畢前獲取執行後的值Task.Result,函式就會阻塞。那我怎麼知道什麼時候完成,而又不阻塞?有兩種辦法,輪詢和回撥通知。Task.IsCompleted屬性會指示函式是否執行完畢。輪詢不是一個好的辦法,採用回撥通知是上策!

  回撥通知有個缺點:處理邏輯不直觀,回撥函式與非同步呼叫函式不在一塊,還有可能隔著很多行程式碼或不在同一個檔案。如果這樣的回撥函式太多,對理解程式碼邏輯造成困難,程式碼不易維護。微軟也考慮到了這個問題,那就用await關鍵詞來解決。await幫你處理了回撥函式的弊端,其實await後面的程式碼與await前面的程式碼不屬於同一個函式!await後面的程式碼就是回撥函式!微軟確實給我們解決了這個問題,但是又帶來另一個問題。好多人不明白,明明是同一個函式,怎麼實現了等待而又不阻塞當前執行緒!歸根到底,還是要理解await背後幫你幹了啥,否則就會一直困惑。

  要生成Task變數,只要理解幾個關鍵的處理步驟就行了。TaskCompletionSource類會幫助我們生成Task。如果IO完成,設定Task的狀態為完成就行了。後面,就會執行回撥函式(await關鍵詞幫我幹了,你看不到回撥)!

如何寫一個IO-base Task函式?

  大部分情況下不需要自己寫這樣的函式。但是,人是有好奇心的,如果不明白函式實現的原理,總是感覺不能釋懷!再者,明白函式實現原理,就能更好的利用這類函式。下面講解一下如何利用IOCP來實現非同步函式。我沒有參考.net的原始碼,只是根據邏輯推理應該這實現。肯定和.net原始碼實現有出入,我寫這些程式碼主要為了闡明Task實現原理。

IOCP處理邏輯

  對於IOCP,這裡不展開來講了,否則就跑題了。以socket讀取為例子,簡單總結一下:如果你要接收100個位元組的資料,你告訴IOCP你要接收100個位元組資料,並提供100個位元組的buffer,函式立即返回;資料到達後,IOCP通知你,資料到了,資料就存在你提供的buffer裡。

   實現非同步IO虛擬碼如下:

 class AyncInside
    {
        //完成埠控制代碼
        IntPtr iocpHandle = IntPtr.Zero;

        Task<byte[]> ReadFromSocket(int count)
        {
            //生成此次操作需要相關資料 
            TaskCompletionSourceRead readInfo = new TaskCompletionSourceRead();
            readInfo.Buffer = new byte[count];

            //如果沒生成iocp則生成。
            if (iocpHandle == IntPtr.Zero)
            {
                iocpHandle = CreateIocp();
            }

            // 告訴iocp,要讀取count位元組資料。函式不會阻塞,會立即返回
            //從完成埠收到資料後,會呼叫ReadScoketCallback
            //我們把readInfo也傳給函式。當回撥時,該變數會傳給回撥函式。
            ReadFromIocp(iocpHandle, readInfo.Buffer, readInfo, ReadScoketCallback);
            
            return readInfo.Tcs.Task;
        }


        void ReadScoketCallback(byte[] buffer, int readCount,object tag)
        {
            //tag就是呼叫ReadFromIocp時,傳的readInfo
            //便於我們知道非同步呼叫時的上下文資料。
            TaskCompletionSourceRead readInfo = tag as TaskCompletionSourceRead;
           
            if(buffer.Length == readCount )
            {
                //呼叫完SetResult後,await後面的程式碼就會被執行!
                readInfo.Tcs.SetResult(buffer);
            }
            else if (buffer.Length > 0)
            {
                Array.Resize(ref buffer, readCount);
                readInfo.Tcs.SetResult(buffer);
            }
            else
            {
                readInfo.Tcs.TrySetException(new Exception("讀取資料異常!socket可能已斷開!"));
            }
        }

        private void ReadFromIocp(IntPtr iocpHandle, byte[] buffer, object tag,
            Action<byte[] , int,object> readScoketCallback)
        {
            throw new NotImplementedException();
        }

        private IntPtr CreateIocp()
        {
            throw new NotImplementedException();
        }

    }

    //封裝非同步讀取需要的資料
    class TaskCompletionSourceRead
    {
        public TaskCompletionSource<byte[]> Tcs { get; set; }
        public byte[] Buffer { get; set; }
    }

  上述程式碼與實際可使用程式碼差距還很大,我在這裡主要為了闡明原理。通過上面的程式碼,我們可以看到,這個非同步函式並沒生成新的執行緒;網絡卡驅動和IOCP配合,幫我們接收了資料。所以這種方式才是真正可擴充套件的非同步IO。

後記 非同步IO和可擴充套件服務緊密關聯。對於.net core平臺,你會看到很多函式都是非同步的。理解和用好非同步IO函式非常重要。本文通過自己對非同步IO的理解,試圖通過程式碼闡明非同步IO實現原理。希望你看過此文後,能對此有更深的理解!如果此文對你有所裨益,希望您給點個