1. 程式人生 > >[轉]springboot2 webflux 響應式程式設計學習路徑

[轉]springboot2 webflux 響應式程式設計學習路徑

原文連結
spring官方文件
springboot2 已經發布,其中最亮眼的非webflux響應式程式設計莫屬了!響應式的weblfux可以支援高吞吐量,意味著使用相同的資源可以處理更加多的請求,毫無疑問將會成為未來技術的趨勢,是必學的技術!很多人都看過相關的入門教程,但看完之後總覺得很迷糊,知其然不知道其所以然,包括我本人也有相同的疑惑。後面在研究和學習中發現,是我的學習路徑不對,很多基本概念不熟悉,之前公司主打的jdk版本還是1.6/1.7,直接跳到執行在jdk8上的webflux,跨度太大,迷惑是在所難免的!

在這裡我個人推薦的學習途徑如下:先學習jdk8的lambda表示式和stream流程式設計,瞭解函數語言程式設計的知識點和思想,接著學習jdk9的響應式流flux,理解響應式流概念,理解背壓和實現機制。這2者學好之後,很容易理解webflux的基石reactor,再學習webflux就水到渠成了!

這裡我記錄了自己的學習之路,列出了每一塊的學習重點,除了API的知識點學習之外,更加重要的瞭解底層執行機制和實現原理。對於我個人來說,學習技術如果不瞭解原理,知識點需要死記硬背,而瞭解了底層機制之後,不但不需要死記硬背,還可以把自己的技術點連成面融會貫通,很容易舉一反三,知識點也不會忘記,也能和別人扯扯技術的底層實現了。

下面只講解重點/高階知識和底層原理,入門教程請自行搜尋學習

lambda表示式

lambda表示式中的this

lambda表示式最終會返回一個實現了指定介面的例項,看上去和內部匿名類很像,但有一個最大的區別就是程式碼裡面的this,內部匿名類this指向的就是匿名類,而lambda表示式裡面的this指向的當前類。

package jdk8.lambda;

/**
 * lambda表示式的this
 * 
 * @author 曉風輕
 *
 */
public class ThisDemo {

  private String name = "ThisDemo";

  public void test() {
    // 匿名類實現
    new Thread(new Runnable() {

      private String name = "Runnable";

      @Override
      public void run() {
        System.out.println("這裡的this指向匿名類:" + this.name);
      }
    }).start();

    // lambda實現
    new Thread(() -> {
      System.out.println("這裡的this指向當前的ThisDemo類:" + this.name);
    }).start();
  }

  public static void main(String[] args) {
    ThisDemo demo = new ThisDemo();
    demo.test();
  }
}

輸出

這裡的this指向匿名類:Runnable
這裡的this指向當前的ThisDemo類:ThisDemo

實現原理

lambda表示式裡面,會把lambda表示式在本類中生成一個以lambda$+數字的方法。關鍵點:該方法不一定是static的方法,是static還是非static,取決於lambda表示式裡面是否引用了this。這就是為什麼lambda表示式裡面的this指向的是本地,因為他在本類裡面建立了一個方法,然後把lambda表示式裡面的程式碼放進去。

    // lambda實現
    // 下面會自動生成lambda$0方法,由於使用了this,所以是非static方法
    new Thread(() -> {
      System.out.println("這裡的this指向當前的ThisDemo類:" + this.name);
    }).start();

    // lambda實現
    // 下面會自動生成lambda$1方法,由於使用了this,所以是static方法
    new Thread(() -> {
      System.out.println("這裡沒有引用this,生成的lambda1方法是static的");
    }).start();

上面程式碼會自動生成2個lambda$方法

使用javap -s -p 類名, 可以看出一個是static,一個是非staic的

this

這就是為什麼lambda表示式裡面的this指向當前類的底層機制!因為程式碼就是在本類的一個方法裡面執行的。

額外說一句,自動生成的方法是否帶引數取決於lambda是否有引數,例子中表達式沒有引數(箭頭左邊是空的),所以自動生成的也沒有。

例項方法的方法引用

方法引用有多種,靜態方法的方法引用很好理解,但例項物件的方法引用一開始確實讓我有點費解,這和靜態方法引用由啥區別?看上去很像啊。

class DemoClass {

    /**
     * 這裡是一個靜態方法
     */
    public static int staticMethod(int i) {
        return i * 2;
    }

    /**
     * 這裡是一個例項方法
     */
    public int normalMethod(int i) {
        System.out.println("例項方法可以訪問this:" + this);
        return i * 3;
    }

}

public class MethodRefrenceDemo {

    public static void main(String[] args) {
        // 靜態方法的方法引用
        IntUnaryOperator methodRefrence1 = DemoClass::staticMethod;
        System.out.println(methodRefrence1.applyAsInt(111));

        DemoClass demo = new DemoClass();

        // 例項方法的方法引用
        IntUnaryOperator methodRefrence2 = demo::normalMethod;
        System.out.println(methodRefrence2.applyAsInt(111));
    }

}

這裡牽涉到不同的語言裡面對this的實現方法。我們知道靜態方法和例項方法的區別是例項方法有this,靜態方法沒有。java裡面是怎麼樣實現this的呢?

java裡面在預設把this作為引數,放到例項方法的第一個引數。

就是說:

    /**
     * 這裡是一個例項方法
     */
    public int normalMethod(int i) {
        System.out.println("例項方法可以訪問this:" + this);
        return i * 2;
    }

編譯之後和下面這樣的程式碼編譯之後是一樣的!

    /**
     * 這裡是一個例項方法
     */
    public int normalMethod(DemoClass this,int i) {
        System.out.println("例項方法可以訪問this:" + this);
        return i * 2;
    }

如何證明?

第1個證據,看反編譯裡面的本地變量表。

靜態方法:
靜態方法

而例項方法

例項方法

第2個證據,下面這樣的程式碼能正確執行。

class DemoCl2{

    /**
     * 這裡是一個例項方法, 程式碼上2個引數
     * 而我們呼叫的時候只有一個引數
     */
    public int normalMethod(DemoClass2 this,int i) {
        return i * 2;
    }
}

public class MethodRefrenceDemo {

    public static void main(String[] args) {
        DemoClass2 demo2 = new DemoClass2();

        // 程式碼定義上有2個引數, 第一個引數為this
        // 但實際上呼叫的時候只需要一個引數
        demo2.normalMethod(1);
       }
}

所以,我的理解,java裡面的所有方法都是靜態方法,只是有些方法有this變數,有些沒有。

所以,成員方法我們也可以寫成靜態方法的方法引用。如下:

public class MethodRefrenceDemo {

    public static void main(String[] args) {
        // 靜態方法的方法引用
        IntUnaryOperator methodRefrence1 = DemoClass::staticMethod;
        System.out.println(methodRefrence1.applyAsInt(111));

        DemoClass demo = new DemoClass();

        // 例項方法normalMethod的方法引用
        IntUnaryOperator methodRefrence2 = demo::normalMethod;
        System.out.println(methodRefrence2.applyAsInt(111));

        // 對同一個例項方法normalMethod也可以使用靜態引用
        // 程式碼上normalMethod雖然只有一個引數,但實際上有一個隱含的this函式
        // 所以使用的是2個引數bifunction函式介面
        BiFunction<DemoClass, Integer, Integer> methodRefrence3 = DemoClass::normalMethod;
        System.out.println(methodRefrence3.apply(demo, 111));
    }
}

上面程式碼裡面。對同一個例項方法normalMethod,我們既可以使用例項方法引用(例項::方法名),也可以使用靜態方法引用(類名::方法名)。

lambda實現惰性求值

惰性求值在lambda裡面非常重要,也非常有用。

舉例,程式設計規範裡面有一條規範,是列印日誌前需要判斷日誌級別(效能要求高的時候)。如下

    // 列印日誌前需要先判斷日誌級別
    if (logger.isLoggable(Level.FINE)) {
      logger.fine("列印一些日誌:" + this);
    }

為什麼要加判斷呢?不加判斷會有問題呢? 看如下程式碼:

package jdk8.lambda;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * lambda的惰性求值
 * 
 * @author 曉風輕
 */
public class LogDemo {

  private static final Logger logger = Logger
      .getLogger(LogDemo.class.getName());

  @Override
  public String toString() {
    System.out.println("這個方法執行了, 耗時1秒鐘");
    try {
      TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }

    return "LogDemo";
  }

  public void test() {
    // 如果不加判斷直接列印, 會有額外多餘的開銷, 就算最終日誌並沒有列印
    logger.fine("列印一些日誌:" + this);
  }

  public static void main(String[] args) {
    LogDemo demo = new LogDemo();
    demo.test();
  }
}

執行程式碼,發現雖然日誌沒有列印,但toString方法還是執行了,屬於多餘浪費的開銷。

每一個日誌列印都加判斷,看著很彆扭,現在有了lambda表示式之後,可以使用lambda的惰性求值,就可以去掉if判斷,如下

    // 使用lambda表示式的惰性求值,不需要判斷日誌級別
    logger.fine(() -> "列印一些日誌:" + this);

底層機制

這個現象很好理解,簡單講解一下。就是沒有使用表示式的時候,相當於

String msg = "列印一些日誌:" + this
logger.fine(msg);

雖然最後沒有列印,但字串拼接的工作還是執行了。而使用了lambda表示式之後,字串的拼接放到一個函式裡面,fine日誌需要列印的時候才去呼叫這個方法才真正執行!從而實現了惰性求值。

後面我們學習的jdk8的stream流程式設計裡面,沒有呼叫最終操作的時候,中間操作的方法都不會執行,這也是惰性求值。

stream流程式設計

stream程式設計主要是學習API的使用,但前提是學好lambda,基礎好了,看這些方法定義非常簡單,要是沒有打好基礎,你會有很多東西需要記憶。

內部迭代和外部迭代

一般來說,我們之前的編碼方法,叫外部迭代,stream的寫法叫內部迭代。內部迭代程式碼更加可讀更加優雅,關注點是做什麼(外部迭代關注是怎麼樣做),也很容易讓我們養成程式設計小函式的好習慣!這點在程式設計習慣裡面非常重要!看例子:

import java.util.stream.IntStream;

public class StreamDemo1 {

  public static void main(String[] args) {
    int[] nums = { 1, 2, 3 };
    // 外部迭代
    int sum = 0;
    for (int i : nums) {
      sum += i;
    }
    System.out.println("結果為:" + sum);

    // 使用stream的內部迭代
    // map就是中間操作(返回stream的操作)
    // sum就是終止操作
    int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
    System.out.println("結果為:" + sum2);

    System.out.println("惰性求值就是終止沒有呼叫的情況下,中間操作不會執行");
    IntStream.of(nums).map(StreamDemo1::doubleNum);
  }

  public static int doubleNum(int i) {
    System.out.println("執行了乘以2");
    return i * 2;
  }
}

操作型別

操作型別概念要理清楚。有幾個維度。

首先分為 中間操作 和 最終操作,在最終操作沒有呼叫的情況下,所有的中級操作都不會執行。那麼那些是中間操作那些是最終操作呢? 簡單來說,返回stream流的就是中間操作,可以繼續鏈式呼叫下去,不是返回stream的就是最終操作。這點很好理解。

最終操作裡面分為短路操作和非短路操作,短路操作就是limit/findxxx/xxxMatch這種,就是找了符合條件的就終止,其他的就是非短路操作。在無限流裡面需要呼叫短路操作,否則像炫邁口香糖一樣根本停不下來!

中間操作又分為 有狀態操作 和 無狀態操作,怎麼樣區分呢? 一開始很多同學需要死記硬背,其實你主要掌握了狀態這個關鍵字就不需要死記硬背。狀態就是和其他資料有關係。我們可以看方法的引數,如果是一個引數的,就是無狀態操作,因為只和自己有關,其他的就是有狀態操作。如map/filter方法,只有一個引數就是自己,就是無狀態操作;而distinct/sorted就是有狀態操作,因為去重和排序都需要和其他資料比較,理解了這點,就不需要死記硬背了!

為什麼要知道有狀態和無狀態操作呢?在多個操作的時候,我們需要把無狀態操作寫在一起,有狀態操作放到最後,這樣效率會更加高。

執行機制

我們可以通過下面的程式碼來理解stream的執行機制

package stream;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
 * 驗證stream執行機制
 * 
 * 1. 所有操作是鏈式呼叫, 一個元素只迭代一次 
 * 2. 每一箇中間操作返回一個新的流. 流裡面有一個屬性sourceStage 
 *     指向同一個 地方,就是Head 
 * 3. Head->nextStage->nextStage->... -> null
 * 4. 有狀態操作會把無狀態操作階段,單獨處理
 * 5. 並行環境下, 有狀態的中間操作不一定能並行操作.
 * 
 * 6. parallel/ sequetial 這2個操作也是中間操作(也是返回stream)
 *     但是他們不建立流, 他們只修改 Head的並行標誌
 * 
 * @author 曉風輕
 *
 */
public class RunStream {

  public static void main(String[] args) {
    Random random = new Random();
    // 隨機產生資料
    Stream<Integer> stream = Stream.generate(() -> random.nextInt())
        // 產生500個 ( 無限流需要短路操作. )
        .limit(500)
        // 第1個無狀態操作
        .peek(s -> print("peek: " + s))
        // 第2個無狀態操作
        .filter(s -> {
          print("filter: " + s);
          return s > 1000000;
        })
        // 有狀態操作
        .sorted((i1, i2) -> {
          print("排序: " + i1 + ", " + i2);
          return i1.compareTo(i2);
        })
        // 又一個無狀態操作
        .peek(s -> {
          print("peek2: " + s);
        }).parallel();

    // 終止操作
    stream.count();
  }

  /**
   * 列印日誌並sleep 5 毫秒
   * 
   * @param s
   */
  public static void print(String s) {
    // System.out.println(s);
    // 帶執行緒名(測試並行情況)
    System.out.println(Thread.currentThread().getName() + " > " + s);
    try {
      TimeUnit.MILLISECONDS.sleep(5);
    } catch (InterruptedException e) {
    }
  }
}

大家自己測試一下程式碼,能發現stream的呼叫方法,就像現實中的流水線一樣,一個元素只會迭代一次,但如果中間有無狀態操作,前後的操作會單獨處理(元素就會被多次迭代)。

jdk9的響應式流

就是reactive stream,也就是flow。其實和jdk8的stream沒有一點關係。說白了就一個釋出-訂閱模式,一共只有4個介面,3個物件,非常簡單清晰。寫一個入門例子就可以掌握。

package jdk9;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
 * 帶 process 的 flow demo
 */

/**
 * Processor, 需要繼承SubmissionPublisher並實現Processor介面
 * 
 * 輸入源資料 integer, 過濾掉小於0的, 然後轉換成字串釋出出去
 */
class MyProcessor extends SubmissionPublisher<String>
    implements Processor<Integer, String> {

  private Subscription subscription;

  @Override
  public void onSubscribe(Subscription subscription) {
    // 儲存訂閱關係, 需要用它來給釋出者響應
    this.subscription = subscription;

    // 請求一個數據
    this.subscription.request(1);
  }

  @Override
  public void onNext(Integer item) {
    // 接受到一個數據, 處理
    System.out.println("處理器接受到資料: " + item);

    // 過濾掉小於0的, 然後釋出出去
    if (item > 0) {
      this.submit("轉換後的資料:" + item);
    }

    // 處理完呼叫request再請求一個數據
    this.subscription.request(1);

    // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
    // this.subscription.cancel();
  }

  @Override
  public void onError(Throwable throwable) {
    // 出現了異常(例如處理資料的時候產生了異常)
    throwable.printStackTrace();

    // 我們可以告訴釋出者, 後面不接受資料了
    this.subscription.cancel();
  }

  @Override
  public void onComplete() {
    // 全部資料處理完了(釋出者關閉了)
    System.out.println("處理器處理完了!");
    // 關閉釋出者
    this.close();
  }
}

public class FlowDemo2 {

  public static void main(String[] args) throws Exception {
    // 1. 定義釋出者, 釋出的資料型別是 Integer
    // 直接使用jdk自帶的SubmissionPublisher
    SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

    // 2. 定義處理器, 對資料進行過濾, 並轉換為String型別
    MyProcessor processor = new MyProcessor();

    // 3. 釋出者 和 處理器 建立訂閱關係
    publiser.subscribe(processor);

    // 4. 定義最終訂閱者, 消費 String 型別資料
    Subscriber<String> subscriber = new Subscriber<String>() {

      private Subscription subscription;

      @Override
      public void onSubscribe(Subscription subscription) {
        // 儲存訂閱關係, 需要用它來給釋出者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
      }

      @Override
      public void onNext(String item) {
        // 接受到一個數據, 處理
        System.out.println("接受到資料: " + item);

        // 處理完呼叫request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
        // this.subscription.cancel();
      }

      @Override
      public void onError(Throwable throwable) {
        // 出現了異常(例如處理資料的時候產生了異常)
        throwable.printStackTrace();

        // 我們可以告訴釋出者, 後面不接受資料了
        this.subscription.cancel();
      }

      @Override
      public void onComplete() {
        // 全部資料處理完了(釋出者關閉了)
        System.out.println("處理完了!");
      }

    };

    // 5. 處理器 和 最終訂閱者 建立訂閱關係
    processor.subscribe(subscriber);

    // 6. 生產資料, 併發布
    // 這裡忽略資料生產過程
    publiser.submit(-111);
    publiser.submit(111);

    // 7. 結束後 關閉釋出者
    // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
    publiser.close();

    // 主執行緒延遲停止, 否則資料沒有消費就退出
    Thread.currentThread().join(1000);
  }

}

背壓

背壓依我的理解來說,是指訂閱者能和釋出者互動(通過程式碼裡面的呼叫request和cancel方法互動),可以調節釋出者釋出資料的速率,解決把訂閱者壓垮的問題。關鍵在於上面例子裡面的訂閱關係Subscription這個介面,他有request和cancel 2個方法,用於通知釋出者需要資料和通知釋出者不再接受資料。

我們重點理解背壓在jdk9裡面是如何實現的。關鍵在於釋出者Publisher的實現類SubmissionPublisher的submit方法是block方法。訂閱者會有一個緩衝池,預設為Flow.defaultBufferSize() = 256。當訂閱者的緩衝池滿了之後,釋出者呼叫submit方法釋出資料就會被阻塞,釋出者就會停(慢)下來;訂閱者消費了資料之後(呼叫Subscription.request方法),緩衝池有位置了,submit方法就會繼續執行下去,就是通過這樣的機制,實現了調節釋出者釋出資料的速率,消費得快,生成就快,消費得慢,釋出者就會被阻塞,當然就會慢下來了。

怎麼樣實現釋出者和多個訂閱者之間的阻塞和同步呢?使用的jdk7的Fork/Join的ManagedBlocker,有興趣的請自己查詢相關資料。

reactor

spring webflux是基於reactor來實現響應式的。那麼reactor是什麼呢?我是這樣理解的
reactor = jdk8的stream + jdk9的flow響應式流。理解了這句話,reactor就很容易掌握。
reactor裡面Flux和Mono就是stream,他的最終操作就是 subscribe/block 2種。reactor裡面說的不訂閱將什麼也不會方法就是我們最開始學習的惰性求值。

我們來看一段程式碼,理解一下:

package com.imooc;

import java.util.concurrent.TimeUnit;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.publisher.Flux;

public class ReactorDemo {

  public static void main(String[] args) {
    // reactor = jdk8 stream + jdk9 reactive stream
    // Mono 0-1個元素
    // Flux 0-N個元素
    String[] strs = { "1", "2", "3" };

    // 2. 定義訂閱者
    Subscriber<Integer> subscriber = new Subscriber<Integer>() {

      private Subscription subscription;

      @Override
      public void onSubscribe(Subscription subscription) {
        // 儲存訂閱關係, 需要用它來給釋出者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
      }

      @Override
      public void onNext(Integer item) {
        // 接受到一個數據, 處理
        System.out.println("接受到資料: " + item);

        try {
          TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }

        // 處理完呼叫request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
        // this.subscription.cancel();
      }

      @Override
      public void onError(Throwable throwable) {
        // 出現了異常(例如處理資料的時候產生了異常)
        throwable.printStackTrace();

        // 我們可以告訴釋出者, 後面不接受資料了
        this.subscription.cancel();
      }

      @Override
      public void onComplete() {
        // 全部資料處理完了(釋出者關閉了)
        System.out.println("處理完了!");
      }

    };

    // 這裡就是jdk8的stream
    Flux.fromArray(strs).map(s -> Integer.parseInt(s))
    // 最終操作
    // 這裡就是jdk9的reactive stream
    .subscribe(subscriber);
  }
}

上面的例子裡面,我們可以把jdk9裡面flowdemo的訂閱者程式碼原封不動的copy過來,直接就可以用在reactor的subscribe方法上。訂閱就是相當於呼叫了stream的最終操作。有了 reactor = jdk8 stream + jdk9 reactive stream 概念後,在掌握了jdk8的stream和jkd9的flow之後,reactor也不難掌握。

spring5的webflux

上面的基礎和原理掌握之後,學習webflux就水到渠成了!webflux的關鍵是自己編寫的程式碼裡面返回流(Flux/Mono),spring框架來負責處理訂閱。 spring框架提供2種開發模式來編寫響應式程式碼,使用mvc之前的註解模式和使用router function模式,都需要我們的程式碼返回流,spring的響應式資料庫spring data jpa,如使用mongodb,也是返回流,訂閱都需要交給框架,自己不能訂閱。而編寫響應式程式碼之前,我們還需要了解2個重要的概念,就是非同步servlet和SSE。

非同步servlet

學習非同步servlet我們最重要的瞭解同步servlet阻塞了什麼?為什麼需要非同步servlet?非同步servlet能支援高吞吐量的原理是什麼?

servlet容器(如tomcat)裡面,每處理一個請求會佔用一個執行緒,同步servlet裡面,業務程式碼處理多久,servlet容器的執行緒就會等(阻塞)多久,而servlet容器的執行緒是由上限的,當請求多了的時候servlet容器執行緒就會全部用完,就無法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!

而非同步serlvet裡面,servlet容器的執行緒不會傻等業務程式碼處理完畢,而是直接返回(繼續處理其他請求),給業務程式碼一個回撥函式(asyncContext.complete()),業務程式碼處理完了再通知我!這樣就可以使用少量的執行緒處理更加高的請求,從而實現高吞吐量!

我們看示例程式碼:

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class AsyncServlet
 */
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
  private static final long serialVersionUID = 1L;

  /**
   * @see HttpServlet#HttpServlet()
   */
  public AsyncServlet() {
    super();
  }

  /**
   * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
   *      response)
   */
  protected void doGet(HttpServletRequest request,
      HttpServletResponse response) throws ServletException, IOException {
    long t1 = System.currentTimeMillis();

    // 開啟非同步
    AsyncContext asyncContext = request.startAsync();

    // 執行業務程式碼
    CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
        asyncContext.getRequest(), asyncContext.getResponse()));

    System.out.println("async use:" + (System.currentTimeMillis() - t1));
  }

  private void doSomeThing(AsyncContext asyncContext,
      ServletRequest servletRequest, ServletResponse servletResponse) {

    // 模擬耗時操作
    try {
      TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    }

    //
    try {
      servletResponse.getWriter().append("done");
    } catch (IOException e) {
      e.printStackTrace();
    }

    // 業務程式碼處理完畢, 通知結束
    asyncContext.complete();
  }

  /**
   * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
   *      response)
   */
  protected void doPost(HttpServletRequest request,
      HttpServletResponse response) throws ServletException, IOException {
    doGet(request, response);
  }
}

大家可以執行上面程式碼,業務程式碼花了5秒,但servlet容器的執行緒幾乎沒有任何耗時。而如果是同步servlet的,執行緒就會傻等5秒,這5秒內這個執行緒只處理了這一個請求。

SSE(server-sent event)

響應式流裡面,可以多次返回資料(其實和響應式沒有關係),使用的技術就是H5的SSE。我們學習技術,API的使用只是最初級也是最簡單的,更加重要的是需要知其然並知其所以然,否則你只能死記硬背不用就忘!我們不滿足在spring裡面能實現sse效果,更加需要知道spring是如何做到的。其實SSE很簡單,我們花一點點時間就可以掌握,我們在純servlet環境裡面實現。我們看程式碼,這裡一個最簡單的示例。


import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class SSE
 */
@WebServlet("/SSE")
public class SSE extends HttpServlet {
  private static final long serialVersionUID = 1L;

  /**
   * @see HttpServlet#HttpServlet()
   */
  public SSE() {
    super();
  }

  /**
   * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
   *      response)
   */
  protected void doGet(HttpServletRequest request,
      HttpServletResponse response) throws ServletException, IOException {
    response.setContentType("text/event-stream");
    response.setCharacterEncoding("utf-8");

    for (int i = 0; i < 5; i++) {
      // 指定事件標識
      response.getWriter().write("event:me\n");
      // 格式: data: + 資料 + 2個回車
      response.getWriter().write("data:" + i + "\n\n");
      response.getWriter().flush();

      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
      }
    }

  }

  /**
   * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
   *      response)
   */
  protected void doPost(HttpServletRequest request,
      HttpServletResponse response) throws ServletException, IOException {
    doGet(request, response);
  }
}

關鍵是ContentType 是 "text/event-stream",然後返回的資料有固定的要求格式即可。

結束語

經過上面的一步一個腳印的學習,我們的基礎已經打牢,障礙已經掃清,現在可以進入輕鬆愉快的spring flux學習之旅了!Enjoy!

作者:曉風輕
連結:https://www.imooc.com/article/27181
來源:慕課網