Reactive Extensions 相見恨晚的Rx.Net
何為Reactive Extensions(Rx)
Rx是一個遵循函數語言程式設計的類庫,它引用觀察者以及迭代器設計模式對可觀察物件產生的資料進行非同步消費。使用Rx,
開發人員將使用LINQ運算子操作非同步資料流,並使用排程程式引數化非同步資料流中的併發性,簡單地說,Rx = Observables + LINQ + Schedulers。
使用Rx需要Nuget安裝System.Reactive Nuget包
Rx的使用場景
響應式UI
UI介面上,使用者對一個繫結資料集合的控制元件進行關鍵字查詢。常規的流程是我們必須在等待使用者鍵盤按下指定的完成鍵(如回車)或滑鼠點選查詢按鈕後程序才開始執行相應的查詢處理。但假設需求變更:“使用者希望在每輸入一個關鍵字後就能及時將關鍵字相應的查詢結果集繫結到控制元件” 如果面臨這個需求,那你會如何實現呢? 你會少不了定義相應的全域性狀態欄位,少不了相應的時間間隔重新整理。我相信寫出來的程式碼也會讓你很煩惱。 其實你有更好的選擇,那就是我們的主角Rx。
Rx 核心
Rx有兩個核心介面 IObservable<T>、IObserver<T>
IObservable<T>
先來看此介面的結構:
IObservable<T>介面就提供一個Subscribe(訂閱)方法,入參是一個 觀察者物件介面 。
我們可以 將IObservable<T>稱之為被觀察者(可觀察者),IObserver<T>稱之為觀察者 。
通過可介面簽名可以看出被觀察者需要輸出T型別的物件。需要理解被觀察者IObservable<T>我們需要與現有的一些常規知識點做出比較,這裡我們用IEnumerable<T>比較。
我想我們都使用過Linq,操作過IEnumerable<T>集合,IEnumerable<T>集合有個明顯的狀態就是它所儲存的元素是靜態的。集合內的元素狀態除非程式碼顯示的新增或刪除、修改,否則這個集合基本是靜態(資料未變動)的。但是IObservable<T>則不同,它的元素是根據被觀察者提供的資料而變動的(不可預測的),就好比在UI上你無法預測使用者的操作行為一樣。
下面這個表格可以看出兩者區別
IEnumerable |
可方便的列舉集合元素值 |
---|---|
IObservable |
可觀察物件變動的值 |
IObserver<T>
IObserver<T>介面可以理解為消費被貫徹著提供資料的一個介面,它的三個方法決定了本次資料流的觀察行為的走向。
通俗理解就是 被觀察者生成資料,觀察者消費資料。
來看下IObserver<T>的結構
- OnNext 表示消費新資料
- OnError 表示觀察資料流出現異常
- OnCompleted 表示明確關閉觀察資料流
程式碼示例
下面程式碼定義了一個可觀察的佇列,該佇列會提供給觀察者三個int型別的入參 1、2、3 供觀察者物件的OnNext方法消費。 MyConsoleObserver(觀察者)在得到資料後打印出來。
1class Program 2{ 3 4static void Main(string[] args) 5{ 6Test(); 7} 8 9private static void Test() 10{ 11var numbers = new MySequenceOfNumbers(); 12var observer = new MyConsoleObserver<int>(); 13numbers.Subscribe(observer); 14Console.ReadLine(); 15} 16 17 } 18 19/// <summary> 20/// 自定義被觀察佇列 21/// </summary> 22public class MySequenceOfNumbers : IObservable<int> 23{ 24public IDisposable Subscribe(IObserver<int> observer) 25{ 26observer.OnNext(1); 27observer.OnNext(2); 28observer.OnNext(3); 29observer.OnCompleted(); 30return Disposable.Empty; 31} 32} 33 34/// <summary> 35/// 自定義觀察者物件 36/// </summary> 37/// <typeparam name="T"></typeparam> 38public class MyConsoleObserver<T> : IObserver<T> 39{ 40public void OnNext(T value) 41{ 42Console.WriteLine("接收到 value {0}", value); 43} 44public void OnError(Exception error) 45{ 46Console.WriteLine("出現異常! {0}", error); 47} 48public void OnCompleted() 49{ 50Console.WriteLine("關閉觀察行為"); 51} 52}
通過示例程式碼我們得知了Rx.Net的資料流訂閱、消費流程。
Subject<T>
我們再來認識下Subject<T>,Subject是一個IObservable,它以命令形式生成一個值,並將該值推送給觀察者物件。我們看下Subject<T>的結構。
看這繼承關係,我們繼續看SubjectBase<T>裡面有些啥~
哎呀,這個類真不得了啊,把IObserver<T>和IObservable<T>都給繼承了。這不是可以自己提供資料自己進行訂閱和消費了麼....
我們來看看Subject<T>是怎麼玩的:
1private static void SubjectTest() 2{ 3//定義一個 型別string的Subject物件 4var inputs = new Subject<string>(); 5//訂閱資料流 6inputs.Subscribe((p => Console.WriteLine($"得到的值:{p}"))); 7//迴圈造資料 8for (int i = 0; i < 4; i++) 9{ 10inputs.OnNext($"時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}, 下標:{i}"); 11} 12Console.ReadLine(); 13}
執行結果如下圖:
需要注意的是訂閱方法需要在資料生成前宣告。
總結
好啦,要睡覺啦,基本上Rx核心的幾個點就先講到這,我們也來總結下Rx.Net的幾個核心知識點:
- 可觀察(被觀察)物件生產資料;
- 觀察者總是被動接收資料;
- 需要明確訂閱後觀察者才得以消費資料;
這裡也只是自己的一個學習總結,Rx也不是眼前的幾個小示例就可以一目瞭然,不過核心的知識基本上就是這些。至於還有一些操作符的話相信用過Linq的話上手不難,難點在於需要在實際業務中找到合適的場景使用,只有不斷使用才會融會貫通。但是任何技術都不能濫用,每一項新技術都有它最佳使用場景,優秀的開發者需要做好權衡。