(譯)在Dart中建立Stream
由Lasse Nielsen撰寫 2013年4月(2018年10月更新)
dart:async庫包含兩種對許多Dart API很重要的型別:Stream 和Future 。如果Future表示單次計算的結果,則流是一系列結果。您在流上監聽以獲得結果(資料和錯誤)以及流關閉的通知。你也可以在收聽時暫停或在流完成之前停止收聽流。
但是這篇文章不是關於使用流。這是關於建立自己的流。你可以通過以下幾種方式建立流:
async * StreamController
本文介紹了每種方法的程式碼,並提供了幫助你正確實現流的提示。
有關使用流的幫助,請參閱非同步程式設計:流。
轉換現有流
建立流的常見情況是您已經擁有流,並且您希望基於原始流的事件建立新流。例如,您可能有一個位元組流,您希望通過UTF-8解碼輸入來轉換為字串流。最通用的方法是建立一個等待原始流上的事件然後輸出新事件的新流。例:
/// Splits a stream of consecutive strings into lines. /// /// The input string is provided in smaller chunks through /// the `source` stream. Stream<String> lines(Stream<String> source) async* { // Stores any partial line from the previous chunk. var partial = ''; // Wait until a new chunk is available, then process it. await for (var chunk in source) { var lines = chunk.split('\n'); lines[0] = partial + lines[0]; // Prepend partial line. partial = lines.removeLast(); // Remove new partial line. for (var line in lines) { yield line; // Add lines to output stream. } } // Add final partial line to output stream, if any. if (partial.isNotEmpty) yield partial; }
對於許多常見的轉換,您可以使用Stream提供的轉換方法,例如map(),where(),expand()和take()。
例如,假設您有一個streamStream流,它每秒發出一個遞增計數器。以下是它的實現方式:
var counterStream = Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
要快速檢視事件,您可以使用以下程式碼:
counterStream.forEach(print); // Print an integer every second, 15 times.
要轉換流事件,可以在偵聽流之前呼叫流上的轉換方法(如map())。該方法返回一個新流。
// Double the integer in each event. var doubleCounterStream = counterStream.map((int x) => x * 2); doubleCounterStream.forEach(print);
您可以使用任何其他轉換方法,而不是map(),例如:
.where((int x) => x.isEven) // Retain only even integer events. .expand((var x) => [x, x]) // Duplicate each event. .take(5) // Stop after the first five events.
通常,您只需要一種轉換方法。但是,如果您需要更多地控制轉換,可以使用Stream的transform()方法指定StreamTransformer。平臺庫為許多常見任務提供流變換器。例如,以下程式碼使用dart:convert庫提供的utf8.decoder和LineSplitter轉換器。
Stream<List<int>> content = File('someFile.txt').openRead(); List<String> lines = await content.transform(utf8.decoder).transform(LineSplitter()).toList();
從頭建立流
建立新流的一種方法是使用非同步生成器(async *)函式。呼叫函式時會建立流,並且在偵聽流時函式的主體開始執行。函式返回時,流關閉。在函式返回之前,它可以使用yield或yield *語句在流上發出事件。
這是一個以固定間隔發出數字的原始示例:
Stream<int> timedCounter(Duration interval, [int maxCount]) async* { int i = 0; while (true) { await Future.delayed(interval); yield i++; if (i == maxCount) break; } }
[PENDING:顯示使用它的程式碼,所以我們有一些提及StreamSubscription的上下文?]
此函式返回一個Stream。當收聽該流時,正文開始執行。它反覆延遲請求的間隔,然後產生下一個數字。如果省略count引數,則迴圈上沒有停止條件,因此流將永遠輸出越來越大的數字 - 或者直到偵聽器取消其訂閱。
當偵聽器取消(通過對listen()方法返回的StreamSubscription物件呼叫cancel())時,下一次正文到達yield語句時,yield將作為return語句。執行任何封閉的finally塊,並退出該函式。如果函式在退出之前嘗試生成一個值,那麼它將失敗並充當返回值。
當函式最終退出時,cancel()方法返回的future將完成。如果函式以錯誤退出,則將來會以該錯誤完成;否則,它以null結束。
另一個更有用的示例是將期貨序列轉換為流的函式:
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* { for (var future in futures) { var result = await future; yield result; } }
此函式要求期貨可迭代新的未來,等待該未來,發出結果值,然後迴圈。如果將來因錯誤而完成,則流將以該錯誤完成。
很少有非同步*函式從無到有構建流。它需要從某個地方獲取資料,而且通常在某個地方是另一個流。在某些情況下,與上面的未來序列一樣,資料來自其他非同步事件源。但是,在許多情況下,async *函式過於簡單,無法輕鬆處理多個數據源。這就是StreamController類的用武之地。
使用StreamController
如果您的流的事件來自程式的不同部分,而不僅僅是來自非同步函式可以遍歷的流或期貨,那麼使用StreamController來建立和填充流。
StreamController為您提供了一個新的流,以及一種在任何地方和任何地方向流新增事件的方法。該流具有處理偵聽器和暫停所需的所有邏輯。您返回流並將控制器保留給自己。
以下示例(來自stream_controller_bad.dart)顯示了StreamController的一個基本但有缺陷的用法,用於實現前面示例中的timedCounter()函式。此程式碼建立一個要返回的流,然後根據計時器事件將資料提供給它,這些事件既不是期貨也不是流事件。
// NOTE: This implementation is FLAWED! // It starts before it has subscribers, and it doesn't implement pause. Stream<int> timedCounter(Duration interval, [int maxCount]) { var controller = StreamController<int>(); int counter = 0; void tick(Timer timer) { counter++; controller.add(counter); // Ask stream to send counter values as event. if (maxCount != null && counter >= maxCount) { timer.cancel(); controller.close(); // Ask stream to shut down and tell listeners. } } Timer.periodic(interval, tick); // BAD: Starts before it has subscribers. return controller.stream; }
和以前一樣,你可以使用timedCounter()返回的流,如下所示:[PENDING:我們之前是否顯示過這個?]
var counterStream = timedCounter(const Duration(seconds: 1), 15); counterStream.listen(print); // Print an integer every second, 15 times.
timedCounter()的這個實現有幾個問題:
- 它在訂閱者之前就開始製作活動。
- 即使訂閱者請求暫停,它也會不斷產生事件。
如下一節所示,您可以通過在建立StreamController時指定回撥(如onListen和onPause)來解決這兩個問題。
等待訂閱
通常,流應該在開始工作之前等待訂閱者。 async *函式會自動執行此操作,但在使用StreamController時,您可以完全控制並且即使不應該也可以新增事件。當流沒有訂閱者時,其StreamController會緩衝事件,如果流永遠不會獲得訂閱者,則會導致記憶體洩漏。
嘗試將使用該流的程式碼更改為以下內容:
void listenAfterDelay() async { var counterStream = timedCounter(const Duration(seconds: 1), 15); await Future.delayed(const Duration(seconds: 5)); // After 5 seconds, add a listener. await for (int n in counterStream) { print(n); // Print an integer every second, 15 times. } }
當此程式碼執行時,前5秒沒有列印任何內容,儘管流正在執行。然後新增偵聽器,並且一次列印前5個左右的事件,因為它們由StreamController緩衝。
要獲得訂閱通知,請在建立StreamController時指定onListen引數。當流獲得其第一個訂閱者時,將呼叫onListen回撥。如果指定onCancel回撥,則在控制器丟失其最後一個訂閱者時呼叫它。在前面的示例中,Timer.periodic()應該移動到onListen處理程式,如下一節所示。
尊重暫停狀態
當偵聽器請求暫停時,避免產生事件。當流訂閱暫停時,async *函式會在yield語句中自動暫停。另一方面,StreamController在暫停期間緩衝事件。如果提供事件的程式碼不遵循暫停,則緩衝區的大小可以無限增長。此外,如果監聽器在暫停後立即停止監聽,則浪費了建立緩衝區所花費的工作。
要檢視沒有暫停支援會發生什麼,請嘗試將使用該流的程式碼更改為以下內容:
void listenWithPause() { var counterStream = timedCounter(const Duration(seconds: 1), 15); StreamSubscription<int> subscription; subscription = counterStream.listen((int counter) { print(counter); // Print an integer every second. if (counter == 5) { // After 5 ticks, pause for five seconds, then resume. subscription.pause(Future.delayed(const Duration(seconds: 5))); } }); }
當五秒鐘暫停時,在此期間觸發的事件都會立即收到。發生這種情況是因為流的源不支援暫停並且不斷向流新增事件。因此,流緩衝事件,然後在流取消暫停時清空其緩衝區。
以下版本的timedCounter()(來自stream_controller.dart)通過使用StreamController上的onListen,onPause,onResume和onCancel回撥來實現暫停。
Stream<int> timedCounter(Duration interval, [int maxCount]) { StreamController<int> controller; Timer timer; int counter = 0; void tick(_) { counter++; controller.add(counter); // Ask stream to send counter values as event. if (counter == maxCount) { timer.cancel(); controller.close(); // Ask stream to shut down and tell listeners. } } void startTimer() { timer = Timer.periodic(interval, tick); } void stopTimer() { if (timer != null) { timer.cancel(); timer = null; } } controller = StreamController<int>( onListen: startTimer, onPause: stopTimer, onResume: startTimer, onCancel: stopTimer); return controller.stream; }
使用上面的listenWithPause()函式執行此程式碼。你會看到它在暫停時停止計數,然後它恢復得很好。
必須使用所有偵聽器-onListen,onCancel,onPause和onResume-來通知暫停狀態的更改。原因是如果訂閱和暫停狀態同時發生變化,則僅呼叫onListen或onCancel回撥。
最後的提示
在不使用async *函式的情況下建立流時,請記住以下提示:
-
使用同步控制器時要小心 - 例如,使用StreamController建立的同步控制器(sync:true)。在未呼叫的同步控制器上傳送事件時(例如,使用由EventSink定義的add(),addError()或close()方法),事件將立即傳送到流上的所有偵聽器。在新增偵聽器的程式碼完全返回之前,永遠不能呼叫流偵聽器,並且在錯誤的時間使用同步控制器可能會破壞此承諾並導致良好的程式碼失敗。避免使用同步控制器。
-
如果使用StreamController,則在listen呼叫返回StreamSubscription之前呼叫onListen回撥。不要讓onListen回撥依賴於已存在的訂閱。例如,在以下程式碼中,在訂閱變數具有有效值之前觸發onListen事件(並呼叫處理程式)。
subscription = stream.listen(handler);
-
StreamController定義的onListen,onPause,onResume和onCancel回撥在流的偵聽器狀態更改時由流呼叫,但從不在事件觸發期間或在呼叫另一個狀態更改處理程式期間呼叫。在這些情況下,狀態更改回調會延遲,直到上一次回撥完成。
-
不要嘗試自己實現Stream介面。很容易獲得事件,回撥之間的互動,以及新增和刪除偵聽器的巧妙錯誤。始終使用可能來自StreamController的現有流來實現新流的偵聽呼叫。
-
雖然可以通過擴充套件Stream類並在頂部實現listen方法和額外功能來建立擴充套件Stream功能和更多功能的類,但通常不建議這樣做,因為它引入了使用者必須考慮的新型別。相反,您通常可以建立一個具有Stream(和更多)的類 - 而不是一個Stream(以及更多)。