Java 9 揭祕(17. Reactive Streams)
Tips
做一個終身學習的人。
在本章中,主要介紹以下內容:
- 什麼是流(stream)
- 響應式流(Reactive Streams)的倡議是什麼,以及規範和Java API
- 響應式流在JDK 中的API以及如何使用它們
- 如何使用JDK 9中的響應式流的Java API來建立釋出者,訂閱者和處理者
一. 什麼是流
流是由生產者生產並由一個或多個消費者消費的元素(item)的序列。 這種生產者——消費者模型也被稱為source/sink模型或釋出者——訂閱者(publisher-subscriber )模型。 在本章中,將其稱為釋出者訂閱者模型。
有幾種流處理機制,其中pull模型和push模型是最常見的。 在push模型中,釋出者將元素推送給訂閱者。 在pull模式中,訂閱者將元素推送給釋出者。 釋出者和訂閱者都以同樣的速率工作,這是一個理想的情況,這些模式非常有效。 我們會考慮一些情況,如果他們不按同樣的速率工作,這種情況下涉及的問題以及對應的解決辦法。
當釋出者比訂閱者快的時候,後者必須有一個無邊界緩衝區來儲存快速傳入的元素,或者它必須丟棄它無法處理的元素。 另一個解決方案是使用一種稱為背壓(backpressure )的策略,其中訂閱者告訴釋出者減慢速率並保持元素,直到訂閱者準備好處理更多的元素。 使用背壓可確保更快的釋出者不會壓制較慢的訂閱者。 使用背壓可能要求釋出者擁有無限制的緩衝區,如果它要一直生成和儲存元素。 釋出者可以實現有界緩衝區來儲存有限數量的元素,如果緩衝區已滿,可以選擇放棄它們。 可以使用另一策略,其中釋出者將釋出元素重新發送到訂閱者,這些元素髮布時訂閱者不能接受。
訂閱者在請求釋出者的元素並且元素不可用時,該做什麼? 在同步請求中訂閱者戶必須等待,無限期地,直到有元素可用。 如果釋出者同步地向訂閱者傳送元素,並且訂閱者同步處理它們,則釋出者必須阻塞直到資料處理完成。 解決方案是在兩端進行非同步處理,訂閱者可以在從釋出者請求元素之後繼續處理其他任務。 當更多的元素準備就緒時,釋出者將它們非同步傳送給訂閱者。
二. 什麼是響應式流(Reactive Streams)
響應式流從2013年開始,作為提供非阻塞背壓的非同步流處理標準的倡議。 它旨在解決處理元素流的問題——如何將元素流從釋出者傳遞到訂閱者,而不需要釋出者阻塞,或訂閱者有無限制的緩衝區或丟棄。
響應式流模型非常簡單——訂閱者向釋出者傳送多個元素的非同步請求。 釋出者向訂閱者非同步傳送多個或稍少的元素。
Tips
響應式流在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。
在2015年,出版了用於處理響應式流的規範和Java API。 有關響應式流的更多資訊,請訪問http://www.reactive-streams.org/
Publisher<T>
Subscriber<T>
Subscription
Processor<T,R>
釋出者(publisher)是潛在無限數量的有序元素的生產者。 它根據收到的要求向當前訂閱者釋出(或傳送)元素。
訂閱者(subscriber)從釋出者那裡訂閱並接收元素。 釋出者向訂閱者傳送訂閱令牌(subscription token)。 使用訂閱令牌,訂閱者從釋出者哪裡請求多個元素。 當元素準備就緒時,釋出者向訂閱者傳送多個或更少的元素。 訂閱者可以請求更多的元素。 釋出者可能有多個來自訂閱者的元素待處理請求。
訂閱(subscription)表示訂閱者訂閱的一個釋出者的令牌。 當訂閱請求成功時,釋出者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與釋出者進行互動,例如請求更多的元素或取消訂閱。
下圖顯示了釋出者和訂閱者之間的典型互動順序。 訂閱令牌未顯示在圖表中。 該圖沒有顯示錯誤和取消事件。
處理者(processor)充當訂閱者和釋出者的處理階段。 Processor
介面繼承了Publisher
和Subscriber
介面。 它用於轉換髮布者——訂閱者管道中的元素。 Processor<T,R>
訂閱型別T的資料元素,接收並轉換為型別R的資料,併發布變換後的資料。 下圖顯示了處理者在釋出者——訂閱和管道中作為轉換器的作用。 可以擁有多個處理者。
下面顯示了響應式流倡導所提供的Java API。所有方法的返回型別為void
。 這是因為這些方法表示非同步請求或非同步事件通知。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
用於響應式流的Java API似乎很容易理解。 但是,實現起來並不簡單。 釋出者和訂閱者之間的所有互動的非同步性質以及處理背壓使得實現變得複雜。 作為應用程式開發人員,會發現實現這些介面很複雜。 類庫應該提供實現來支援廣泛的用例。 JDK 9提供了Publisher
介面的簡單實現,可以將其用於簡單的用例或擴充套件以滿足自己的需求。 RxJava是響應式流的Java實現之一。
三. JDK 9 中響應式流的API
JDK 9在java.util.concurrent包中提供了一個與響應式流相容的API,它在java.base模組中。 API由兩個類組成:
Flow
SubmissionPublisher<T>
Flow
類是final的。 它封裝了響應式流Java API和靜態方法。 由響應式流Java API指定的四個介面作為巢狀靜態介面包含在Flow
類中:
Flow.Processor<T,R>
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription
這四個介面包含與上面程式碼所示的相同的方法。 Flow
類包含defaultBufferSize()
靜態方法,它返回釋出者和訂閱者使用的緩衝區的預設大小。 目前,它返回256。
SubmissionPublisher<T>
類是Flow.Publisher<T>
介面的實現類。 該類實現了AutoCloseable
介面,因此可以使用try-with-resources塊來管理其例項。 JDK 9不提供Flow.Subscriber<T>
介面的實現類; 需要自己實現。 但是,SubmissionPublisher<T>
類包含可用於處理此釋出者釋出的所有元素的consume(Consumer<? super T> consumer)
方法。
1. 釋出者——訂閱者互動
在開始使用JDK API之前,瞭解使用響應式流的典型釋出者——訂閱者會話中發生的事件順序很重要。 包括在每個事件中使用的方法。 釋出者可以擁有零個或多個訂閱者。 這裡只使用一個訂閱者。
- 建立釋出者和訂閱者,它們分別是
Flow.Publisher
和Flow.Subscriber
介面的例項。 - 訂閱者通過呼叫釋出者的
subscribe()
方法來嘗試訂閱釋出者。 如果訂閱成功,釋出者用Flow.Subscription
非同步呼叫訂閱者的onSubscribe()
方法。 如果嘗試訂閱失敗,則使用呼叫訂閱者的onError()
方法,並丟擲IllegalStateException
異常,並且釋出者——訂閱者互動結束。 - 訂閱者通過呼叫
Subscription
的request(N)
方法向釋出者傳送多個元素的請求。 訂閱者可以向釋出者傳送更多元素的多個請求,而不必等待其先前請求是否完成。 - 訂閱者在所有先前的請求中呼叫訂閱者的
onNext(T item)
方法,直到訂閱者戶請求的元素數量上限——在每次呼叫中向訂閱者傳送一個元素。 如果釋出者沒有更多的元素要傳送給訂閱者,則釋出者呼叫訂閱者的onComplete()
方法來發訊號通知流,從而結束髮布者——訂閱者互動。 如果訂閱者請求Long.MAX_VALUE
元素,則它實際上是無限制的請求,並且流實際上是推送流。 - 如果釋出者隨時遇到錯誤,它會呼叫訂閱者的
onError()
方法。 - 訂閱者可以通過呼叫其
Flow.Subscription
的cancel()
方法來取消訂閱。 一旦訂閱被取消,釋出者——訂閱者互動結束。 然而,如果在請求取消之前存在未決請求,訂閱者可以在取消訂閱之後接收元素。
總結上述結束條件的步驟,一旦在訂閱者上呼叫了onComplete()
或onError()
方法,訂閱者就不再收到釋出者的通知。
在釋出者的subscribe()
方法被呼叫之後,如果訂閱者不取消其訂閱,則保證以下訂閱方法呼叫序列:
onSubscribe onNext* (onError | onComplete)?
這裡,符號*
和?
在正則表示式中被用作關鍵字,一個*
表示零個或多個出現, ?
意為零或一次。
在訂閱者上的第一個方法呼叫是onSubscribe()
方法,它是成功訂閱釋出者的通知。訂閱者的onNext()
方法可以被呼叫零次或多次,每次呼叫指示元素髮布。onComplete()
和onError()
方法可以被呼叫為零或一次來指示終止狀態; 只要訂閱者不取消其訂閱,就會呼叫這些方法。
2. 建立釋出者
建立釋出者取決於Flow.Publisher<T>
介面的實現類。該類包含以下建構函式:
SubmissionPublisher()
SubmissionPublisher(Executor executor, int maxBufferCapacity)
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
SubmissionPublisher
使用提供的Executor
向其訂閱者提供元素。 如果使用多個執行緒來生成要釋出的元素並且可以估計訂閱者數量,則可以使用具有固定執行緒池的newFixedThreadPool(int nThread)
,這可以使用Executors
類的newFixedThreadPool(int nThread)
靜態方法獲取。 否則,使用預設的Executor
,它使用ForkJoinPool
類的commonPool()
方法獲取。
SubmissionPublisher
類為每個訂閱者使用一個獨立的緩衝區。 緩衝區大小由建構函式中的maxBufferCapacity
引數指定。 預設緩衝區大小是Flow
類的defaultBufferSize()
靜態方法返回的值,該值為256。如果釋出的元素數超過了訂戶的緩衝區大小,則額外的元素將被刪除。 可以使用SubmissionPublisher
類的getMaxBufferCapacity()
方法獲取每個訂閱者的當前緩衝區大小。
當訂閱者的方法丟擲異常時,其訂閱被取消。 當訂閱者的onNext()
方法丟擲異常時,在其訂閱被取消之前呼叫建構函式中指定的處理程式。 預設情況下,處理程式為null。
以下程式碼片段會建立一個SubmissionPublishe
r,它釋出所有屬性設定為預設值的Long
型別的元素:
// Create a publisher that can publish Long values
SubmissionPublisher<Long> pub = new SubmissionPublisher<>();
SubmissionPublisher
類實現了AutoCloseable
介面。 呼叫其close()
方法呼叫其當前訂閱者上的onComplete()
方法。 呼叫close()
方法後嘗試釋出元素會丟擲IllegalStateException
異常。
3. 釋出元素
SubmissionPublisher<T>
類包含以下發布元素的方法:
int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
int submit(T item)
submit()
方法阻塞,直到當前訂閱者的資源可用於釋出元素。 考慮每個訂閱者的緩衝區容量為10的情況。 訂閱者訂閱了釋出者並且不請求任何元素。 釋出者釋出了10個元素並全部緩衝所有元素。 嘗試使用submit()
方法釋出另一個元素將阻塞,因為訂閱者的緩衝區已滿。
offer()
方法是非阻塞的。 該方法的第一個版本允許指定超時,之後刪除該項。 可以指定一個刪除處理器,它是一個BiPredicate
。 在刪除訂閱者的元素之前呼叫刪除處理器的test()
方法。 如果test()
方法返回true,則再次重試該項。 如果test()
方法返回false,則在不重試的情況下刪除該項。 從offer()
方法返回的負整數表示向訂閱者傳送元素失敗的嘗試次數;正整數表示在所有當前訂閱者中提交但尚未消費的最大元素數量的估計。
應該使用哪種方法釋出一個元素:submit()
或offer()
? 這取決於你的要求。 如果每個已釋出的元素必須發給所有訂閱者,則submit()
方法是最好選擇。 如果要等待發布一段特定時間的元素進行重試,則可以使用offer()
方法。
4. 舉個例子
來看一個使用SubmissionPublisher
作為釋出者的例子。 SubmissionPublisher
可以使用其submit(T item)
方法釋出元素。 以下程式碼片段生成併發布五個整數(1,2,3,4和5),假設pub
是對SubmissionPublisher
物件的引用:
// Generate and publish 10 integers
LongStream.range(1L, 6L)
.forEach(pub::submit);
需要訂閱者才能使用釋出者釋出的元素。 SubmissionPublisher
類包含一個consume(Consumer<? super T> consumer)
方法,它允許新增一個希望處理所有已釋出元素的訂閱者,並且對任何其他通知(如錯誤和完成通知)不感興趣。 該方法返回一個CompletedFuture<Void>
,當釋出者呼叫訂閱者的onComplete()
方法時,表示完成。 以下程式碼片段將一個Consumer
作為訂閱者新增到釋出者中:
// Add a subscriber that prints the published items
CompletableFuture<Void> subTask = pub.consume(System.out::println);
本章中的程式碼是com.jdojo.stream的模組的一部分,其宣告如下所示。
// module-info.java
module com.jdojo.stream {
exports com.jdojo.stream;
}
下面包含了NumberPrinter
類的程式碼,它顯示瞭如何使用SubmissionPublisher
類來發布整數。 示例程式碼的詳細說明遵循NumberPrinter
類的輸出。
// NumberPrinter.java
package com.jdojo.stream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.LongStream;
public class NumberPrinter {
public static void main(String[] args) {
CompletableFuture<Void> subTask = null;
// The publisher is closed when the try block exits
try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
// Print the buffer size used for each subscriber
System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());
// Add a subscriber to the publisher. The subscriber prints the published elements
subTask = pub.consume(System.out::println);
// Generate and publish five integers
LongStream.range(1L, 6L)
.forEach(pub::submit);
}
if (subTask != null) {
try {
// Wait until the subscriber is complete
subTask.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
輸出結果為:
Subscriber Buffer Size: 256
1
2
3
4
5
main()
方法宣告一個subTask
的變數來儲存訂閱者任務的引用。 subTask.get()
方法將阻塞,直到訂閱者完成。
CompletableFuture<Void> subTask = null;
釋出型別為Long
的元素髮布者是在資源塊中建立的。 釋出者是SubmissionPublisher<Long>
類的例項。 當try塊退出時,釋出者將自動關閉。
try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
//...
}
該程式列印將訂閱釋出者的每個訂閱者的緩衝區大小。
// Print the buffer size used for each subscriber
System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());
訂閱者將使用consume()
方法新增到釋出者。 請注意,該方法允許指定一個Consumer
,它在內部轉換為Subscribe
r。 每個釋出的元素會通知訂閱者。 訂閱者只需列印它接收的元素。
// Add a subscriber to the publisher. The subscriber prints the published elements
subTask = pub.consume(System.out::println);
現在是釋出整數的時候了。 該程式生成五個整數,1到5,並使用釋出者的submit()
方法釋出它們。
// Generate and publish five integers
LongStream.range(1L, 6L)
.forEach(pub::submit);
已釋出的整數以非同步方式傳送給訂閱者。 當try塊退出時,釋出者關閉。 要保持程式執行,直到訂閱者完成處理所有已釋出的元素,必須呼叫subTask.get()
。 如果不呼叫此方法,則可能不會在輸出中看到五個整數。
4. 建立訂閱者
要有訂閱者,需要建立一個實現Flow.Subscriber<T>
介面的類。 實現介面方法的方式取決於具體的需求。 在本節中,將建立一個SimpleSubscriber
類,該類實現Flow.Subscriber<Long>
介面。 下面包含此類的程式碼。
// SimpleSubscriber.java
package com.jdojo.stream;
import java.util.concurrent.Flow;
public class SimpleSubscriber implements Flow.Subscriber<Long> {
private Flow.Subscription subscription;
// Subscriber name
private String name = "Unknown";
// Maximum number of items to be processed by this subscriber
private final long maxCount;
// keep track of number of items processed
private long counter;
public SimpleSubscriber(String name, long maxCount) {
this.name = name;
this.maxCount = maxCount <= 0 ? 1 : maxCount;
}
public String getName() {
return name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.printf("%s subscribed with max count %d.%n", name, maxCount);
// Request all items in one go
subscription.request(maxCount);
}
@Override
public void onNext(Long item) {
counter++;
System.out.printf("%s received %d.%n", name, item);
if (counter >= maxCount) {
System.out.printf("Cancelling %s. Processed item count: %d.%n", name, counter);
// Cancel the subscription
subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
System.out.printf("An error occurred in %s: %s.%n", name, t.getMessage());
}
@Override
public void onComplete() {
System.out.printf("%s is complete.%n", name);
}
}
SimpleSubscriber
類的例項表示一個訂閱者,它有一個名稱和要處理的最大數量的items (maxCount)
方法。 需要將其名稱和maxCount
傳遞給其建構函式。 如果maxCount
小於1,則在建構函式中設定為1。
在onSubscribe()
方法中,它儲存釋出者在名為subscription
的例項變數中傳遞的Flow.Subscription
。 它列印有關Flow.Subscription
的訊息,並請求一次可以處理的所有元素。 該訂閱者有效地使用push模型,因為在該請求之後,不再向釋出者傳送更多的元素的請求。 釋出著將推送maxCount
或更少的元素數量給該訂閱者。
在onNext()
方法中,它將counter
例項變數遞增1。counter
例項變數跟蹤訂閱者接收到的元素數量。 該方法列印詳細說明接收到的元素訊息。 如果它已經收到可以處理的最後一個元素,它將取消訂閱。 取消訂閱後,釋出者不再收到任何元素。
在onError()
和onComplete()
方法中,它列印一個有關其狀態的訊息。
以下程式碼段建立一個SimpleSubscriber
,其名稱為S1
,可以處理最多10個元素。
SimpleSubscriber sub1 = new SimpleSubscriber("S1", 10);
現在看一下具體使用SimpleSubscriber
的例子。 下包含一個完整的程式。 它定期釋出元素。 釋出一個元素後,它等待1到3秒鐘。 等待的持續時間是隨機的。 以下詳細說明本程式的輸出。 該程式使用非同步處理可能導致不同輸出結果。
// PeriodicPublisher.java
package com.jdojo.stream;
import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class PeriodicPublisher {
final static int MAX_SLEEP_DURATION = 3;
// Used to generate sleep time
final static Random sleepTimeGenerator = new Random();
public static void main(String[] args) {
SubmissionPublisher<Long> pub = new SubmissionPublisher<>();
// Create three subscribers
SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);
SimpleSubscriber sub2 = new SimpleSubscriber("S2", 5);
SimpleSubscriber sub3 = new SimpleSubscriber("S3", 6);
SimpleSubscriber sub4 = new SimpleSubscriber("S4", 10);
// Subscriber to the publisher
pub.subscribe(sub1);
pub.subscribe(sub2);
pub.subscribe(sub3);
// Subscribe the 4th subscriber after 2 seconds
subscribe(pub, sub4, 2);
// Start publishing items
Thread pubThread = publish(pub, 5);
try {
// Wait until the publisher is finished
pubThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Thread publish(SubmissionPublisher<Long> pub, long count) {
Thread t = new Thread(() -> {
for (long i = 1; i <= count; i++) {
pub.submit(i);
sleep(i);
}
// Close the publisher
pub.close();
});
// Start the thread
t.start();
return t;
}
private static void sleep(Long item) {
// Wait for 1 to 3 seconds
int sleepTime = sleepTimeGenerator.nextInt(MAX_SLEEP_DURATION) + 1;
try {
System.out.printf("Published %d. Sleeping for %d sec.%n", item, sleepTime);
TimeUnit.SECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void subscribe(SubmissionPublisher<Long> pub, Subscriber<Long> sub,
long delaySeconds) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(delaySeconds);
pub.subscribe(sub);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
輸出結果為:
S2 subscribed with max count 5.
Published 1. Sleeping for 1 sec.
S3 subscribed with max count 6.
S1 subscribed with max count 2.
S1 received 1.
S3 received 1.
S2 received 1.
Published 2. Sleeping for 1 sec.
S1 received 2.
S2 received 2.
S3 received 2.
Cancelling S1. Processed item count: 2.
S4 subscribed with max count 10.
Published 3. Sleeping for 1 sec.
S4 received 3.
S3 received 3.
S2 received 3.
Published 4. Sleeping for 2 sec.
S4 received 4.
S3 received 4.
S2 received 4.
Published 5. Sleeping for 2 sec.
S2 received 5.
Cancelling S2. Processed item count: 5.
S4 received 5.
S3 received 5.
S3 is complete.
S4 is complete.
PeriodicPublisher
類使用兩個靜態變數。 MAX_SLEEP_DURATION
靜態變數儲存釋出這等待發佈下一個元素最大秒數。 它設定為3。sleepTimeGenerator
靜態變數Random
物件的引用,該物件在sleep()
方法中用於生成下一個等待的隨機持續時間。
PeriodicPublisher
類的main()
方法執行以下操作:
- 它建立作為
SubmissionPublisher<Long>
類的例項的釋出者。 - 它建立了四個為
S1
,S2
,S3
和S4
的訂閱者。每個訂閱者能夠處理不同數量的元素。 - 三個訂閱者立即訂閱。
S4
的訂閱者在兩秒鐘的最短延遲之後以單獨的執行緒訂閱。PeriodicPublisher
類的subscribe()
方法負責處理此延遲訂閱。注意到在兩個元素(1和2)已經發布之後S4
訂閱的輸出中,它將不會收到這兩個元素。- 它呼叫
publish()
方法,它啟動一個新的執行緒來發布五個元素,它啟動執行緒並返回執行緒引用。 main()
方法呼叫釋出元素執行緒的join()
方法,所以在所有元素髮布之前程式不會終止。publish()
方法負責釋出五個元素。最後關閉釋出者。它呼叫sleep()
方法,使當前執行緒休眠一個和MAX_SLEEP_DURATION
秒之間的隨機選擇的持續時間。- 在輸出中注意到,一些訂閱者取消了訂閱,因為他們從釋出商那裡收到指定數量的元素。
請注意,該程式保證所有元素將在終止之前釋出,但不保證所有訂閱者都將收到這些元素。 在輸出中,會看到訂閱者收到所有已釋出的元素。 這是因為釋出者在釋出最後一個元素後等待至少一秒鐘,這給了訂閱者足夠的時間,在這個小程式中接收和處理最後一個元素。
該程式沒有表現出背壓(backpressure) ,因為所有訂閱者都通過一次性請求元素來使用push模型。 可以將SimpleSubscriber
類修改為分配任務,以檢視背壓的效果:
- 在
onSubscribe()
方法中使用subscription.request(1)
方法請求一個元素。 - 在
onNext()
方法中,延遲後請求更多的元素。 延遲應使訂閱者的工作速度較慢,釋出者釋出元素的速度較慢。 - 需要釋出超過256個元素,這是每個釋出者向訂閱者使用的預設緩衝區,或者使用
SubmissionPublisher
類的另一個建構函式使用較小的緩衝區大小。 這將迫使釋出者釋出比訂閱者可以處理的更多的元素。 - 訂閱者使用刪除處理程式( drop handler)訂閱,以便可以看到釋出者何時發現背壓。
- 使用
SubmissionPublisher
類的offer()
方法釋出元素,因此當訂閱者無法處理更多元素時,釋出者不會無限期地等待。
5. 使用處理者
處理者(Processor)同時是訂閱者也是釋出者。 要使用處理者,需要一個實現Flow.Processor<T,R>
介面的類,其中T是訂閱元素型別,R是已釋出的元素型別。 在本節中,建立了一個基於Predicate<T>
過濾元素的簡單處理者。 處理者訂閱釋出六個整數——1,2,3,4,5和6的釋出者。訂閱者訂閱處理者。 處理者從其釋出者接收元素,如果它們通過了Predicate<T>
指定的標準,則重新發布相同的元素。 下面包含其例項作為處理者的FilterProcessor<T>
類的程式碼。
// FilterProcessor.java
package com.jdojo.stream;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Predicate;
public class FilterProcessor<T> extends SubmissionPublisher<T> implements Processor<T,T>{
private Predicate<? super T> filter;
public FilterProcessor(Predicate<? super T> filter) {
this.filter = filter;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
// Request an unbounded number of items
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(T item) {
// If the item passes the filter publish it. Otherwise, no action is needed.
System.out.println("Filter received: " + item);
if (filter.test(item)) {
this.submit(item);
}
}
@Override
public void onError(Throwable t) {
// Pass the onError message to all subscribers asynchronously
this.getExecutor().execute(() -> this.getSubscribers()
.forEach(s -> s.onError(t)));
}
@Override
public void onComplete() {
System.out.println("Filter is complete.");
// Close this publisher, so all its subscribers will receive a onComplete message
this.close();
}
}
FilterProcessor<T>
類繼承自SubmissionPublisher<T>
類,並實現了Flow.Processor<T,T>
介面。 處理者必須是釋出者以及訂閱者。 從SubmissionPublisher<T>
類繼承了這個類,所以不必編寫程式碼來使其成為釋出者。 該類實現了Processor<T,T>
介面的所有方法,因此它將接收和釋出相同型別的元素。
建構函式接受Predicate<? super T>
引數並將其儲存在例項變數filter
中,將在onNext()
方法中使用filter
元素。
onNext()
方法應用filter
。 如果filter
返回true,則會將該元素重新發布到其訂閱者。 該類從其超類SubmissionPublisher
繼承了用於重新發布元素的submit()
方法。
onError()
方法非同步地將錯誤重新發布給其訂閱者。 它使用SubmissionPublisher
類的getExecutor()
和getSubscribers()
方法,該方法返回Executor
和當前訂閱者的列表。 Executor
用於非同步地向當前訂閱者釋出訊息。
onComplete()
方法關閉處理者的釋出者部分,它將向所有訂閱者傳送一個onComplete
訊息。
讓我們看看這個處理者具體的例子。 下面包含ProcessorTest
類的程式碼。 可能會得到一個不同的輸出,因為這個程式涉及到幾個非同步步驟。 該程式的詳細說明遵循程式的輸出。
// ProcessorTest.java
package com.jdojo.stream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
public class ProcessorTest {
public static void main(String[] args) {
CompletableFuture<Void> subTask = null;
// The publisher is closed when the try block exits
try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
// Create a Subscriber
SimpleSubscriber sub = new SimpleSubscriber("S1", 10);
// Create a processor
FilterProcessor<Long> filter = new FilterProcessor<>(n -> n % 2 == 0);
// Subscribe the filter to the publisher and a subscriber to the filter
pub.subscribe(filter);
filter.subscribe(sub);
// Generate and publish 6 integers
LongStream.range(1L, 7L)
.forEach(pub::submit);
}
try {
// Sleep for two seconds to let subscribers finish handling all items
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出的結果為:
S1 subscribed with max count 10.
Filter received: 1
Filter received: 2
Filter received: 3
S1 received 2.
Filter received: 4
S1 received 4.
Filter received: 5
Filter received: 6
Filter is complete.
S1 received 6.
S1 is complete.
ProcessorTest
類的main()
方法建立一個釋出者,它將釋出六個整數——1,2,3,4,5和6。該方法做了很多事情:
- 它建立一個使用try-with-resources塊的釋出者,所以當try塊退出時它將自動關閉。
- 它建立一個
SimpleSubscriber
類的例項的訂閱者。訂閱者名為S1
,最多可處理10個元素。 - 它建立一個處理者,它是
FilterProcessor<Long>
類的例項。傳遞一個Predicate<Long>
,讓處理者重新發布整數並丟棄奇數。 - 處理者被訂閱釋出者,並且簡單訂閱者被訂閱到處理者。這完成了釋出者到訂閱者的管道——釋出者到處理者到訂閱者。
- 在第一個try塊的末尾,程式碼生成從1到6的整數,並使用釋出者釋出它們。
- 在
main()
方法結束時,程式等待兩秒鐘,以確保處理者和訂閱者有機會處理其事件。如果刪除此邏輯,程式可能無法列印任何內容。必須包含這個邏輯,因為所有事件都是非同步處理的。當第一個try塊退出時,釋出者將完成向處理者傳送所有通知。然而,處理者和訂閱者需要一些時間來接收和處理這些通知。
四. 總結
流是生產者生產並由一個或多個消費者消費的元素序列。 這種生產者——消費者模型也被稱為source/sink模型或發行者——訂閱者模型。
有幾種流處理機制,pull模型和push模型是最常見的。 在push模型中,釋出者將資料流推送到訂閱者。 在pull模型中,定於這從釋出者拉出資料。 當兩端不以相同的速率工作的時,這些模型有問題。 解決方案是提供適應釋出者和訂閱者速率的流。 使用稱為背壓的策略,其中訂閱者通知釋出者它可以處理多少個元素,並且釋出者僅向訂閱者傳送那些需要處理的元素。
響應式流從2013年開始,作為提供非阻塞背壓的非同步流處理標準的舉措。 它旨在解決處理元素流的問題 ——如何將元素流從釋出者傳遞到訂閱者,而不需要釋出者阻塞,或者訂閱者有無限制的緩衝區或丟棄。 響應式流模型在pull模型和push模型流處理機制之間動態切換。 當訂閱者處理較慢時,它使用pull模型,當訂閱者處理更快時使用push模型。
在2015年,出版了一個用於處理響應式流的規範和Java API。 Java API 中的響應式流由四個介面組成:Publisher<T>
,Subscriber<T>
,Subscription
和Processor<T,R>
。
釋出者根據收到的要求向訂閱者釋出元素。 使用者訂閱釋出者接收元素。 釋出者向訂閱者傳送訂閱令牌。 使用訂閱令牌,訂閱者從釋出者請求多個數據元素。 當資料元素準備就緒時,釋出者向訂閱者傳送多個個或稍少的資料元素。 訂閱者可以請求更多的資料元素。
JDK 9在java.util.concurrent包中提供了與響應式流相容的API,它在java.base模組中。 API由兩個類組成:Flow
和SubmissionPublisher<T>
。
Flow
類封裝了響應式流Java API。 由響應式流Java API指定的四個介面作為巢狀靜態介面包含在Flow
類中:Flow.Processor<T,R>
,Flow.Publisher<T>
,Flow.Subscriber<T>
和Flow.Subscription
。