1. 程式人生 > >外行人都能看懂的WebFlux,錯過了血虧!

外行人都能看懂的WebFlux,錯過了血虧!

前言

只有光頭才能變強。

文字已收錄至我的GitHub倉庫,歡迎Star:https://github.com/ZhongFuCheng3y/3y

本文知識點架構:

如果有關注我公眾號文章的同學就會發現,最近我不定時轉發了一些比較好的WebFlux的文章,因為我最近在學。

我之前也說過,學習一項技術之前,先要了解為什麼要學這項技術。其實這次學習WebFlux也沒有多大的原生動力,主要是在我們組內會輪流做一次技術分享,而我又不知道分享什麼比較好...

之前在初學大資料相關的知識,但是這一塊的時間線會拉得比較長,感覺趕不及小組內分享(而組內的同學又大部分都懂大資料,就只有我一個菜雞,淚目)。所以,想的是:“要不我學點新東西搞搞?”。於是就花了點時間學WebFlux

啦~

這篇文章主要講解什麼是WebFlux,帶領大家入個門,希望對大家有所幫助(至少看完這篇文章,知道WebFlux是幹嘛用的)

一、什麼是WebFlux?

我們從Spring的官網拉下一點點就可以看到介紹WebFlux的地方了

從官網的簡介中我們能得出什麼樣的資訊?

  • 我們程式設計師往往根據不同的應用場景選擇不同的技術,有的場景適合用於同步阻塞的,有的場景適合用於非同步非阻塞的。而Spring5提供了一整套響應式(非阻塞)的技術棧供我們使用(包括Web控制器、許可權控制、資料訪問層等等)。
  • 而左側的圖則是技術棧的對比啦;
    • 響應式一般用Netty或者Servlet 3.1的容器(因為支援非同步非阻塞),而Servlet技術棧用的是Servlet容器
    • 在Web端,響應式用的是WebFlux,Servlet用的是SpringMVC
    • .....

總結起來,WebFlux只是響應式程式設計中的一部分(在Web控制端),所以一般我們用它與SpringMVC來對比。

二、如何理解響應式程式設計?

在上面提到了響應式程式設計(Reactive Programming),而WebFlux只是響應式程式設計的其中一個技術棧而已,所以我們先來探討一下什麼是響應式程式設計

從維基百科裡邊我們得到的定義:

reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change

響應式程式設計(reactive programming)是一種基於資料流(data stream)和變化傳遞(propagation of change)的宣告式(declarative)的程式設計正規化

在維基百科上也舉了個小例子:

意思大概如下:

  • 在指令式程式設計(我們的日常程式設計模式)下,式子a=b+c,這就意味著a的值是由bc計算出來的。如果b或者c後續有變化,不會影響到a的值
  • 在響應式程式設計下,式子a:=b+c,這就意味著a的值是由bc計算出來的。但如果b或者c的值後續有變化,會影響到a的值

我認為上面的例子已經可以幫助我們理解變化傳遞(propagation of change)

那資料流(data stream)和宣告式(declarative)怎麼理解呢?那可以提一提我們的Stream流了。之前寫過Lambda表示式和Stream流的文章,大家可以先去看看:

  • 最近學到的Lambda表示式基礎知識
  • 手把手帶你體驗Stream流

Lambda的語法是這樣的(Stream流的使用會涉及到很多Lambda表示式的東西,所以一般先學Lambda再學Stream流):

Stream流的使用分為三個步驟(建立Stream流、執行中間操作、執行最終操作):

執行中間操作實際上就是給我們提供了很多的API去操作Stream流中的資料(求和/去重/過濾)等等

說了這麼多,怎麼理解資料流和宣告式呢?其實是這樣的:

  • 本來資料是我們自行處理的,後來我們把要處理的資料抽象出來(變成了資料流),然後通過API去處理資料流中的資料(是宣告式的)

比如下面的程式碼;將陣列中的資料變成資料流,通過顯式宣告呼叫.sum()來處理資料流中的資料,得到最終的結果:

public static void main(String[] args) {
    int[] nums = { 1, 2, 3 };
    int sum2 = IntStream.of(nums).parallel().sum();
    System.out.println("結果為:" + sum2);
}

如圖下所示:

2.1 響應式程式設計->非同步非阻塞

上面講了響應式程式設計是什麼:

響應式程式設計(reactive programming)是一種基於資料流(data stream)和變化傳遞(propagation of change)的宣告式(declarative)的程式設計正規化

也講解了資料流/變化傳遞/宣告式是什麼意思,但說到響應式程式設計就離不開非同步非阻塞。

從Spring官網介紹WebFlux的資訊我們就可以發現asynchronous, nonblocking 這樣的字樣,因為響應式程式設計它是非同步的,也可以理解成變化傳遞它是非同步執行的。

如下圖,合計的金額會受其他的金額影響(更新的過程是非同步的):

我們的JDK8 Stream流是同步的,它就不適合用於響應式程式設計(但基礎的用法是需要懂的,因為響應式流程式設計都是操作流嘛)

而在JDK9 已經支援響應式流了,下面我們來看一下

三、JDK9 Reactive

響應式流的規範早已經被提出了:裡面提到了:

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure ----->http://www.reactive-streams.org/

翻譯再加點資訊:

響應式流(Reactive Streams)通過定義一組實體,介面和互操作方法,給出了實現非同步非阻塞背壓的標準。第三方遵循這個標準來實現具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。

規範裡頭實際上就是定義了四個介面:

Java 平臺直到 JDK 9才提供了對於Reactive的完整支援,JDK9也定義了上述提到的四個介面,在java.util.concurrent包上

一個通用的流處理架構一般會是這樣的(生產者產生資料,對資料進行中間處理,消費者拿到資料消費):

  • 資料來源,一般稱為生產者(Producer)
  • 資料的目的地,一般稱為消費者(Consumer)
  • 在處理時,對資料執行某些操作一個或多個處理階段。(Processor)

到這裡我們再看回響應式流的介面,我們應該就能懂了:

  • Publisher(釋出者)相當於生產者(Producer)
  • Subscriber(訂閱者)相當於消費者(Consumer)
  • Processor就是在釋出者與訂閱者之間處理資料用的

在響應式流上提到了back pressure(背壓)這麼一個概念,其實非常好理解。在響應式流實現非同步非阻塞是基於生產者和消費者模式的,而生產者消費者很容易出現的一個問題就是:生產者生產資料多了,就把消費者給壓垮了。

而背壓說白了就是:消費者能告訴生產者自己需要多少量的資料。這裡就是Subscription介面所做的事。

下面我們來看看JDK9介面的方法,或許就更加能理解上面所說的話了:

// 釋出者(生產者)
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
// 訂閱者(消費者)
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
// 用於釋出者與訂閱者之間的通訊(實現背壓:訂閱者能夠告訴生產者需要多少資料)
public interface Subscription {
    public void request(long n);
    public void cancel();
}
// 用於處理髮布者 釋出訊息後,對訊息進行處理,再交由消費者消費
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

3.1 看個例子

程式碼中有大量的註釋,我就不多BB了,建議直接複製跑一下看看:

class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 儲存訂閱關係, 需要用它來給釋出者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個數據, 處理
        System.out.println("處理器接受到資料: " + item);

        // 過濾掉小於0的, 然後釋出出去
        if (item > 0) {
            this.submit("轉換後的資料:" + item);
        }

        // 處理完呼叫request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現了異常(例如處理資料的時候產生了異常)
        throwable.printStackTrace();

        // 我們可以告訴釋出者, 後面不接受資料了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部資料處理完了(釋出者關閉了)
        System.out.println("處理器處理完了!");
        // 關閉釋出者
        this.close();
    }

}

public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定義釋出者, 釋出的資料型別是 Integer
        // 直接使用jdk自帶的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定義處理器, 對資料進行過濾, 並轉換為String型別
        MyProcessor processor = new MyProcessor();

        // 3. 釋出者 和 處理器 建立訂閱關係
        publiser.subscribe(processor);

        // 4. 定義最終訂閱者, 消費 String 型別資料
        Subscriber<String> subscriber = new Subscriber<String>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 儲存訂閱關係, 需要用它來給釋出者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一個數據, 處理
                System.out.println("接受到資料: " + item);

                // 處理完呼叫request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理資料的時候產生了異常)
                throwable.printStackTrace();

                // 我們可以告訴釋出者, 後面不接受資料了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部資料處理完了(釋出者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 5. 處理器 和 最終訂閱者 建立訂閱關係
        processor.subscribe(subscriber);

        // 6. 生產資料, 併發布
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 結束後 關閉釋出者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主執行緒延遲停止, 否則資料沒有消費就退出
        Thread.currentThread().join(1000);
    }

}

輸出的結果如下:

流程實際上非常簡單的:

參考資料:

  • https://yanbin.blog/java-9-talk-reactive-stream/#more-8877
  • https://blog.csdn.net/wudaoshihun/article/details/83070086
  • http://www.spring4all.com/article/6826
  • https://www.cnblogs.com/IcanFixIt/p/7245377.html

Java 8 的 Stream 主要關注在流的過濾,對映,合併,而 Reactive Stream 更進一層,側重的是流的產生與消費,即流在生產與消費者之間的協調

說白了就是:響應式流是非同步非阻塞+流量控制的(可以告訴生產者自己需要多少的量/取消訂閱關係)

展望響應式程式設計的場景應用:

比如一個日誌監控系統,我們的前端頁面將不再需要通過“命令式”的輪詢的方式不斷向伺服器請求資料然後進行更新,而是在建立好通道之後,資料流從系統源源不斷流向頁面,從而展現實時的指標變化曲線;

再比如一個社交平臺,朋友的動態、點贊和留言不是手動刷出來的,而是當後臺資料變化的時候自動體現到介面上的。

四、入門WebFlux

扯了一大堆,終於回到WebFlux了。經過上面的基礎,我們現在已經能夠得出一些結論的了:

  • WebFlux是Spring推出響應式程式設計的一部分(web端)
  • 響應式程式設計是非同步非阻塞的(是一種基於資料流(data stream)和變化傳遞(propagation of change)的宣告式(declarative)的程式設計正規化)

我們再回來看官網的圖:

4.1 簡單體驗WebFlux

Spring官方為了讓我們更加快速/平滑到WebFlux上,之前SpringMVC那套都是支援的。也就是說:我們可以像使用SpringMVC一樣使用著WebFlux。

WebFlux使用的響應式流並不是用JDK9平臺的,而是一個叫做Reactor響應式流庫。所以,入門WebFlux其實更多是瞭解怎麼使用Reactor的API,下面我們來看看~

Reactor是一個響應式流,它也有對應的釋出者(Publisher ),Reactor的釋出者用兩個類來表示:

  • Mono(返回0或1個元素)
  • Flux(返回0-n個元素)

而訂閱者則是Spring框架去完成

下面我們來看一個簡單的例子(基於WebFlux環境構建):

// 阻塞5秒鐘
private String createStr() {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    }
    return "some string";
}

// 普通的SpringMVC方法
@GetMapping("/1")
private String get1() {
    log.info("get1 start");
    String result = createStr();
    log.info("get1 end.");
    return result;
}

// WebFlux(返回的是Mono)
@GetMapping("/2")
private Mono<String> get2() {
    log.info("get2 start");
    Mono<String> result = Mono.fromSupplier(() -> createStr());
    log.info("get2 end.");
    return result;
}

首先,值得說明的是,我們構建WebFlux環境啟動時,應用伺服器預設是Netty的:

我們分別來訪問一下SpringMVC的介面和WebFlux的介面,看一下有什麼區別:

SpringMVC:

WebFlux:

從呼叫者(瀏覽器)的角度而言,是感知不到有什麼變化的,因為都是得等待5s才返回資料。但是,從服務端的日誌我們可以看出,WebFlux是直接返回Mono物件的(而不是像SpringMVC一直同步阻塞5s,執行緒才返回)。

這正是WebFlux的好處:能夠以固定的執行緒來處理高併發(充分發揮機器的效能)。

WebFlux還支援伺服器推送(SSE - >Server Send Event),我們來看個例子:

/**
     * Flux : 返回0-n個元素
     * 注:需要指定MediaType
     * @return
     */
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
    Flux<String> result = Flux
        .fromStream(IntStream.range(1, 5).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
            return "flux data--" + i;
        }));
    return result;
}

效果就是每秒會給瀏覽器推送資料:

非常感謝人才們能看到這裡,如果這個文章寫得還不錯,覺得「三歪」我有點東西的話 求點贊 求關注️ 求分享