1. 程式人生 > >響應式程式設計系列(一):什麼是響應式程式設計?reactor入門

響應式程式設計系列(一):什麼是響應式程式設計?reactor入門

響應式程式設計 系列文章目錄

(一)什麼是響應式程式設計?reactor入門

(二)Flux入門學習:流的概念,特性和基本操作

(三)Flux深入學習:流的高階特性和進階用法

(四)reactor-core響應式api如何測試和除錯?

(五)Spring reactive: Spring WebFlux的使用

(六)Spring reactive: webClient的使用

引言

  Spring framework 5 的一大新特性:響應式程式設計(Reactive Programming)。那麼什麼是響應式?他能給我們帶來什麼?如何優雅地使用?本系列會從最基礎的概念和簡單的api講起,再慢慢深入探討響應式的一些高階特性,最後講解實戰內容,例如WebFlux和WebClient等在Spring boot中的使用,如何測試和除錯。

  想要了解原理的話,美團點評的這篇部落格 Java NIO淺析 非常適合入門。

簡單地說:

  當我們呼叫socket.read()、socket.write()這類阻塞函式的時候,這類函式不能立即返回,也無法中斷,需要等待socket可讀或者可寫,才會返回,因此一個執行緒只能處理一個請求。在這等待的過程中,cpu並不幹活,(即阻塞住了),那麼cpu的資源就沒有很好地利用起來。因此對於這種情況,我們使用多執行緒來提高cpu資源的利用率:在等待的這段時間,就可以切換到別的執行緒去處理事件,直到socket可讀或可寫了,通過中斷訊號通知cpu,再切換回來繼續處理資料。例如執行緒A正在等待socket可讀,而執行緒B已經就緒了,那麼就可以先切換到執行緒B去處理。雖然上下文切換也會花一些時間,但是遠比阻塞線上程A這裡空等要好。當然計算機內部實際的情況比這複雜得多。

  而NIO的讀寫函式可以立刻返回,這就給了我們不開執行緒利用CPU的最好機會:如果一個連線不能讀寫(socket.read()返回0或者socket.write()返回0),我們可以把這件事記下來。因此只需要一個執行緒不斷地輪詢這些事件,一旦有就緒的時間,處理即可。不需要多執行緒。

 

阻塞型IO

  • 需要多執行緒,即需要很大的執行緒池。
  • 每個請求都要有一個單獨的執行緒去處理。

 

非阻塞型IO

  • 只需要數量非常少的執行緒。
  • 固定的幾個工作執行緒去處理事件。

 

使用NIO我們能得到什麼?

  • 事件驅動模型
  • 避免多執行緒
  • 單執行緒處理多工
  • 非阻塞I/O,I/O讀寫不再阻塞,而是返回0
  • 基於block的傳輸,通常比基於流的傳輸更高效
  • 更高階的IO函式,zero-copy
  • IO多路複用大大提高了Java網路應用的可伸縮性和實用性

響應式程式設計入門

  響應式程式設計就是基於reactor的思想,當你做一個帶有一定延遲的才能夠返回的io操作時,不會阻塞,而是立刻返回一個流,並且訂閱這個流,當這個流上產生了返回資料,可以立刻得到通知並呼叫回撥函式處理資料。

 

基本模型

我們首先需要理解響應式程式設計的基本模型:

 

Flux

  Reactor中的釋出者(Publisher)由FluxMono兩個類定義,它們都提供了豐富的操作符(operator)。一個Flux物件代表一個包含0..N個元素的響應式序列,元素可以是普通物件、資料庫查詢的結果、http響應體,甚至是異常。而一個Mono物件代表一個包含零/一個(0..1)元素的結果。上圖就是一個Flux型別的資料流,Flux往流上傳送了3個元素,Subscriber通過訂閱這個流來接收通知。

如何建立一個流?最簡單的方式有以下幾種:

//建立一個流,並直接往流上釋出一個值為value資料
Flux.just(value);

//通過list建立一個流,往流上依次釋出list中的資料
Flux.fromIterable(list);

//建立一個流,並向流上從i開始連續釋出n個數據,資料型別為Integer
Flux.range(i, n);

//建立一個流,並定時向流上釋出一個數據,資料從0開始遞增,資料型別為Long
Flux.interval(Duration.ofSeconds(n));

既然是“資料流”的釋出者,Flux和Mono都可以發出三種“資料訊號”:元素值、錯誤訊號、完成訊號,錯誤訊號和完成訊號都是終止訊號,完成訊號用於告知下游訂閱者該資料流正常結束,錯誤訊號終止資料流的同時將錯誤傳遞給下游訂閱者。

Subscriber 

subscriber是一個訂閱者,他只有非常簡單的4個介面:

public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    //收到下一個元素值訊號時的行為
    void onNext(T var1);

    //收到錯誤訊號時的行為
    void onError(Throwable var1);

    //收到終止訊號時的行為
    void onComplete();
}

Subscriber必須要訂閱一個Flux才能夠接收通知:

flux.subscribe(
    value -> handleData(value),
    error -> handleError(error),
    () -> handleComplete()
);

上面這個例子通過lambda表示式,定義了Subscriber分別在收到訊息,收到錯誤,和訊息流結束時的行為,當Subscriber接收到一個新資料,就會非同步地執行handleData方法處理資料。

 

簡單例子:

接下來我們建立幾個最簡單的流來試一下:

首先我們新建一個maven專案,引入reactor的類庫:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <version>3.2.3.RELEASE</version>
        <scope>test</scope>
    </dependency>
</dependencies>

 

編寫程式碼如下:

public class ReactorTests {

    @After
    public void after() {
        sleep(30_000);
    }

    @Test
    public void testJust() {
        Flux.just("hello", "world")
            .subscribe(System.out::println);
    }

    @Test
    public void testList() {
        List<String> words = Arrays.asList(
            "hello",
            "reactive",
            "world"
        );

        Flux.fromIterable(words)
            .subscribe(System.out::println);
    }

    @Test
    public void testRange() {
        Flux.range(1, 10)
            .subscribe(System.out::println);
    }

    @Test
    public void testInterval() {
        Flux.interval(Duration.ofSeconds(1))
            .subscribe(System.out::println);
    }
}

訂閱這些流,收到資料之後只是簡單地把它打印出來,執行這些Test,就能夠看到訂閱者在接收到流上的資料時,非同步地去處理這些資料。