1. 程式人生 > >WebFlux響應式程式設計基礎之 4 reactive stream 響應式流

WebFlux響應式程式設計基礎之 4 reactive stream 響應式流

reactive stream 響應式流 — 簡而言之,就是多了一個溝通的渠道

這裡寫圖片描述

釋出訂閱者

背壓 交流

Reactive Stream主要介面

java.util.concurrent.Flow 原始碼很重要 很有意思 多讀幾遍


import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public
class FlowDemo { public static void main(String[] args) throws Exception { // 1. 定義釋出者, 釋出的資料型別是 Integer // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 介面 SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定義訂閱者 Subscriber<Integer> subscriber = new
Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 儲存訂閱關係, 需要用它來給釋出者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1
); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("接受到資料: " + item); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } // 處理完呼叫request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理資料的時候產生了異常) throwable.printStackTrace(); // 我們可以告訴釋出者, 後面不接受資料了 this.subscription.cancel(); } @Override public void onComplete() { // 全部資料處理完了(釋出者關閉了) System.out.println("處理完了!"); } }; // 3. 釋出者和訂閱者 建立訂閱關係 publiser.subscribe(subscriber); // 4. 生產資料, 併發布 // 這裡忽略資料生產過程 for (int i = 0; i < 1000; i++) { System.out.println("生成資料:" + i); // submit是個block方法 publiser.submit(i); } // 5. 結束後 關閉釋出者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主執行緒延遲停止, 否則資料沒有消費就退出 Thread.currentThread().join(1000); // debug的時候, 下面這行需要有斷點 // 否則主執行緒結束無法debug System.out.println(); } }

響應式流(ReactiveStreams)為這種非阻塞背壓的非同步流處理提供了一個標準。在處理系統出現過載的時候,採用非同步傳送訊號的方式通知資料來源做相應的處理。這個通知的訊號就像是水管的閥門一樣,關閉這個閥門會增加背壓(資料來源對處理系統的壓力),同時也會增加處理系統的壓力。

這個標準的目的是治理跨非同步邊界的流資料交換(比如向其他執行緒傳輸資料) ,同時確保處理系統不被緩衝資料而壓垮。換一種說法,背壓是這個標準模型的一個組成部分,以便允許線上程之間調停的佇列被界定。特別注意,背壓通訊是非同步的。

完整例項

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
 * 帶 process 的 flow demo
 */

/**
 * Processor, 需要繼承SubmissionPublisher並實現Processor介面
 * 
 * 輸入源資料 integer, 過濾掉小於0的, 然後轉換成字串釋出出去
 */
class MyProcessor extends SubmissionPublisher<String>implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 儲存訂閱關係, 需要用它來給釋出者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個數據, 處理
        System.out.println("處理器接受到資料: " + item);

        // 過濾掉小於0的, 然後釋出出去
        if (item > 0) {
            this.submit("轉換後的資料:" + item);
        }

        // 處理完呼叫request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現了異常(例如處理資料的時候產生了異常)
        throwable.printStackTrace();

        // 我們可以告訴釋出者, 後面不接受資料了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部資料處理完了(釋出者關閉了)
        System.out.println("處理器處理完了!");
        // 關閉釋出者
        this.close();
    }

}

public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定義釋出者, 釋出的資料型別是 Integer
        // 直接使用jdk自帶的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定義處理器, 對資料進行過濾, 並轉換為String型別
        MyProcessor processor = new MyProcessor();

        // 3. 釋出者 和 處理器 建立訂閱關係
        publiser.subscribe(processor);

        // 4. 定義最終訂閱者, 消費 String 型別資料
        Subscriber<String> subscriber = new Subscriber<String>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 儲存訂閱關係, 需要用它來給釋出者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一個數據, 處理
                System.out.println("接受到資料: " + item);

                // 處理完呼叫request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理資料的時候產生了異常)
                throwable.printStackTrace();

                // 我們可以告訴釋出者, 後面不接受資料了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部資料處理完了(釋出者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 5. 處理器 和 最終訂閱者 建立訂閱關係
        processor.subscribe(subscriber);

        // 6. 生產資料, 併發布
        // 這裡忽略資料生產過程
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 結束後 關閉釋出者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主執行緒延遲停止, 否則資料沒有消費就退出
        Thread.currentThread().join(1000);
    }

}

執行機制

反饋

submit是一個阻塞方法