1. 程式人生 > >朱曄和你聊Spring系列S1E5:Spring WebFlux小探

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

本文會來做一些應用對比Spring MVC和Spring WebFlux,觀察執行緒模型的區別,然後做一下簡單的壓力測試。

建立一個傳統的Spring MVC應用

先來建立一個新的webflux-mvc的模組:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webflux-mvc</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webflux-mvc</name>
    <description></description>

    <parent>
        <groupId>me.josephzhu</groupId>
        <artifactId>spring101</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

然後在專案裡定義一個我們會使用到的POJO:

package me.josephzhu.spring101webfluxmvc;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "mydata")
public class MyData {
    @Id
    private String id;
    private String payload;
    private long time;
}

這裡的@Document和@Id是為Mongodb服務的,我們定義了MyData將會以mydata作為Collection的名字,然後id欄位是Document的Id列。 然後我們來建立Controller,在這個Controller裡面我們嘗試三種不同的操作:

  1. Sleep 100ms的純獲取資料的方法。從請求中獲得length引數作為payload字串的長度,從請求中獲得size引數作為MyData的個數。我們在之後的測試過程中可以隨意調節這兩個引數來調整我們的資料量。
  2. 從Mongodb獲取資料的方法,獲取到資料後直接返回。
  3. 複合邏輯。先走HTTP請求從data方法獲取資料,然後把資料儲存進入Mongodb,最後返回這些資料。
package me.josephzhu.spring101webfluxmvc;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@RestController
public class MyController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private MyRepository myRepository;

    @GetMapping("/data")
    public List<MyData> getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {

        }
        String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
        return IntStream.rangeClosed(1, size)
                .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))
                .collect(Collectors.toList());
    }

    @GetMapping("/dbData")
    public List<MyData> getDbData() {
        return myRepository.findAll();
    }

    @GetMapping("/saveData")
    public List<MyData> saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data")
                .queryParam("size", size)
                .queryParam("length", length);
        ResponseEntity<List<MyData>> responseEntity =
                restTemplate.exchange(builder.toUriString(),
                        HttpMethod.GET, null, new ParameterizedTypeReference<List<MyData>>() {});
        return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList());
    }
}

注意,在這裡我們使用了Java 8的Steam來做一些操作避免使用for迴圈:

  1. 通過length引數構建payload(payload由length個字元a構成)。
  2. 通過size引數構建MyData的List。
  3. 在RestTemplate獲取到MyData的List後,把每一個物件交由myRepository的save方法來處理,然後統一收集返回結果。 這些Stream的程式碼都是同步處理,也不涉及外部IO,和非阻塞沒有任何關係,只是方便程式碼編寫。為了讓程式碼可以執行,我們還需要繼續來配置下Mongodb的Repository:
package me.josephzhu.spring101webfluxmvc;

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MyRepository extends MongoRepository<MyData, String> { }

因為我們沒有用到複雜的查詢,在程式碼裡只是用到了findAll方法,所以這裡我們無需定義額外的方法,只是宣告介面即可。 最後,我們建立主應用程式,順便配置一下Mongodb和RestTemplate:

package me.josephzhu.spring101webfluxmvc;

import com.mongodb.MongoClientOptions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@Configuration
public class Spring101WebfluxMvcApplication {

   @Bean
   MongoClientOptions mongoClientOptions(){
       return MongoClientOptions.builder().connectionsPerHost(1000).build();
   }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Spring101WebfluxMvcApplication.class, args);
    }
}

這裡我們配置了Mongodb客戶端使得之後在進行壓力測試的時候能有超過100個連線連線到Mongodb,否則會出現無法獲取連線的問題。

建立WebFlux版本的應用

現在我們再來新建一個webflux模組:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webflux</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webflux</name>
    <description></description>

    <parent>
        <groupId>me.josephzhu</groupId>
        <artifactId>spring101</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

這裡可以注意到,我們引入了webflux這個starter以及data-mongodb-reactive這個starter。在之前的Spring MVC專案中,我們引入的是mvc和data-mongodb兩個starter。 然後,我們同樣需要建立一下MyData類(程式碼和之前一模一樣,這裡省略)。 最關鍵的一步,我們來建立三個Controller方法的定義:

package me.josephzhu.spring101webflux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class MyHandler {
    @Autowired
    private MyReactiveRepository myReactiveRepository;

    public Mono<ServerResponse> getData(ServerRequest serverRequest) {
        int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
        int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));

        String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
        Flux<MyData> data = Flux.fromStream(IntStream.rangeClosed(1, size)
                .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100));

        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

    public Mono<ServerResponse> getDbData(ServerRequest serverRequest) {
        Flux<MyData> data = myReactiveRepository.findAll();
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

    public Mono<ServerResponse> saveData(ServerRequest serverRequest) {
        int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
        int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));

        Flux<MyData> data = WebClient.create().get()
                .uri(builder -> builder
                        .scheme("http")
                        .host("localhost")
                        .port(8080)
                        .path("data")
                        .queryParam("size", size)
                        .queryParam("length", length)
                        .build())
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToFlux(MyData.class)
                .flatMap(myReactiveRepository::save);

        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

}

這裡要說明幾點:

  1. 在WebFlux中,我們可以採用傳統的@Controller方式來定義Controller,也可以採用函式式方式來宣告對外的Endpoint,也就是宣告Handler+Router。我們這裡採用的是更有特色的後者來演示。
  2. 請你比較一下三個方法的實現對於兩個版本的區別。最主要的區別,我們返回的實際資料是Mono<>和Flux<>,分別代表0~1個物件和0~N物件的響應式流。
  3. 在saveData方法中,對於Spring MVC我們使用的是阻塞的RestTemplate來從遠端獲取資料,對於Spring WebFlux我們使用的是非阻塞的WebClient來獲取資料。獲取資料後,我們直接使用flatMap獲取到了所有的MyData轉給我們的響應式的Mongodb Repository來處理資料。
  4. 對於saveData方法中插入Mongodb的操作,這裡和MVC的例子有很大的不同需要注意。在MVC中,我們把遠端服務返回的結果轉為Stream資料流,同步依次呼叫save方法,整個過程只會有佔用一個Mongodb的連線。而在這裡,直接對Flux流進行了Map,整個過程相當於併發進行了Mongodb的呼叫。在之後做壓測的時候,我們會再次提到這點。 剛才有提到,採用函式式宣告對外的Endpoint的話除了定義Handler,還需要配置Router來和Handler關聯,配置如下:
package me.josephzhu.spring101webflux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class RouterConfig {
    @Autowired
    private MyHandler myHandler;

    @Bean
    public RouterFunction<ServerResponse> config() {
        return route(GET("/data"), myHandler::getData)
                .andRoute(GET("/dbData"), myHandler::getDbData)
                .andRoute(GET("/saveData"), myHandler::saveData);
    }
}

這段程式碼沒有太多需要說明,這裡我們定義了三個GET請求(相當於MVC的@GetMapping),然後對應到注入的myHandler的三個方法上。 然後我們還需要建立Mongodb的Repository:

package me.josephzhu.spring101webflux;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MyReactiveRepository extends ReactiveMongoRepository<MyData, String> { }

以及配置和啟動類:

package me.josephzhu.spring101webflux;

import com.mongodb.ConnectionString;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@Configuration
public class Spring101WebfluxApplication {

    @Bean
    MongoClient mongoClient(){
        return MongoClients.create(mongoClientSettings());
    }

    @Bean
    MongoClientSettings mongoClientSettings(){
        return MongoClientSettings.builder()
                .clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build())
                .connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build())
                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Spring101WebfluxApplication.class, args);
    }
}

這裡對Mongodb做了一些配置,主要也是希望放大連線池這塊的預設限制,為今後的壓測服務。注意,在這裡配置的Bean是com.mongodb.reactivestream.client下的MongoClient,如下圖所示,還有其它兩個MongoClient,如果修改了不匹配的MongoClient的話是不會有作用的,我在這個坑裡躺了兩小時。 完成後可以開啟瀏覽器測試一下介面:

Spring MVC還是WebFlux?

下圖是官網的一個圖說明了兩者的關係,然後官網也給出了一些建議:

  1. 如果你現在的Spring MVC執行的沒啥問題的話就別改了,有大量的類庫可以使用,實現簡單易於理解。
  2. 如果你希望實現輕量級的,函式式Web框架,那麼可以考慮WebFlux的函式Web端點。
  3. 如果你依賴阻塞的持久化API比如JPA和JDBC那麼也就只能選擇Spring MVC了。目前對於非阻塞的JDBC實現有一些早期的專案在探索,但是沒有到可以上生產的成熟度。
  4. 在Spring MVC應用程式中進行遠端呼叫也是可以使用響應式的WebClient的。Spring MVC也可以使用其它的響應式元件。每次呼叫延遲越厲害受益越大。
  5. 對於大型應用程式要考慮到非阻塞方式實現的學習曲線。最簡單的起步方式就是使用WebClient,完全切換到非阻塞需要花時間熟悉函式式宣告式的程式設計API。 官方的意思也是可以在一些小引用上嘗試WebFlux,對於大型應用不建議冒然轉到WebFlux。

觀察執行緒模型

我們知道對於阻塞的實現方式,我們採用執行緒池來服務請求(執行緒池中的會維護一組普通的執行緒,執行緒池只是節省執行緒建立的時間),對於每一個請求的處理,至始至終都是在一個執行緒中進行,如果處理的過程中我們需要訪問外部的網路或資料庫,那麼執行緒就處於阻塞狀態,這個執行緒無法服務其它請求,如果當時還有更多的併發的話,就需要建立更多的執行緒來服務其它請求。這種實現方式是非常簡單的,應對壓力的增長擴容方式也是粗暴的,那就是增加更多執行緒。 對於非阻塞的方式,採用的是EventLoop的方式,IO操作的時候是不佔用工作執行緒的,因此只會建立一組和CPU核數相當的工作執行緒用於工作處理(NodeJS甚至是單執行緒的,這種就更危險了,就那麼一個工作執行緒,一旦被長時間佔用其它請求都無法處理)。由於整個處理過程中IO請求不佔用執行緒時間,執行緒不會阻塞等待,再增加超過CPU核數的工作執行緒也是沒有意義的(只會白白增加執行緒切換的開銷)。對於這種方式在壓力增長後,因為我們不需要增加額外的執行緒,也就沒有了絕對的瓶頸。 試想一下在阻塞模型下,對於5000的併發,而且每一個併發阻塞的時間非常長,那麼我們其實需要5000個執行緒來服務(這麼多執行緒99%其實都是在等待,屬於空耗系統資源),建立5000的執行緒不談其它的,如果執行緒棧大小是1M的話就需要5GB的記憶體。對於非阻塞的執行緒模型在8核機器上還是8個工作執行緒,記憶體佔用還是這麼小,可以以最小的開銷應對大併發,系統的損耗很少。非阻塞的Reactive模式是內耗非常小的模式,但是這是有代價的,在實現上我們需要確保處理過程中沒有阻塞產生,否則就會浪費寶貴的數目固定的工作執行緒,也就是說我們需要依賴配套的非阻塞IO類庫來使用。 在預設情況下tomcat的工作執行緒池初始化為10,最大200,我們通過啟動本文建立的Spring101WebfluxMvcApplication程式,用jvisualvm工具來看下初始的情況(35個執行緒): 在專案的application.properties檔案中我們配置tomcat的最大執行緒數: server.tomcat.max-threads=250 在壓力的情況下,我們再來觀察一下執行緒的情況(272個執行緒): 的確是建立多達250個工作執行緒。這裡看到大部分執行緒都在休眠,因為我們這裡執行的是剛才的data()方法,在方法內我們休眠了100毫秒。對於同樣的壓力,我們再來看一下Spring101WebfluxApplication程式的執行緒情況(44個執行緒): 可以看到用於處理HTTP的Reactor執行緒只有8個,和本機CPU核數量一致(下面有十個Thread打頭的執行緒是處理和Mongodb互動的,忽略),只需要這8個執行緒處理HTTP請求足以,因為HTTP請求的IO處理不會佔用執行緒。

使用Gatling進行壓力測試

我們可以使用Gatling類庫進行壓力測試,我個人感覺比Jmeter方便。配置很簡單,首先我們要安裝Scala的SDK,然後我們新建一個模組:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webstresstest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webstresstest</name>
    <description></description>

    <dependencies>
        <dependency>
            <groupId>io.gatling.highcharts</groupId>
            <artifactId>gatling-charts-highcharts</artifactId>
            <version>2.3.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>io.gatling</groupId>
                <artifactId>gatling-maven-plugin</artifactId>
                <version>2.2.4</version>
                <configuration>
                    <simulationClass>me.josephzhu.spring101.webstresstest.StressTest</simulationClass>
                    <resultsFolder>/Users/zyhome/gatling</resultsFolder>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

引入了garling的maven外掛,在這裡配置了測試結果輸出路徑以及壓測的類。接下去建立一下這個Scala測試類:

package me.josephzhu.spring101.webstresstest

import io.gatling.core.Predef._
import io.gatling.core.scenario.Simulation
import io.gatling.http.Predef._

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/data?size=10&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}

這段程式碼定義瞭如下的測試行為:

  1. 宣告一個data測試場景,重複進行1000次測試,發起一個遠端呼叫,驗證呼叫結果的響應狀態碼是200並且返回的結果包含字串payload。
  2. 測試啟動的時候直接壓上去200個使用者,每一個使用者執行完這1000次測試後結束了,所以這種方式一開始會是200使用者到測試最後階段使用者數會慢慢減少。當然還有其它一些測試方式(比如慢慢遞增使用者的方式),詳見官網:
    nothingFor(4 seconds), // 1
    atOnceUsers(10), // 2
    rampUsers(10) over (5 seconds), // 3
    constantUsersPerSec(20) during (15 seconds), // 4
    constantUsersPerSec(20) during (15 seconds) randomized, // 5
    rampUsersPerSec(10) to 20 during (10 minutes), // 6
    rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7
    splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8
    splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9
    heavisideUsers(1000) over (20 seconds) // 10

壓力測試一

先來進行第一個測試,1000併發對data介面進行100次迴圈(還記得嗎,介面有100ms休眠or延遲的):

class StressTest extends Simulation {

  val scn = scenario("data").repeat(100) {
    exec(
      http("mvc data")
        .get("http://localhost:8080/data?size=10&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(1000)))
}

下面兩個圖分別是MVC和WebFlux的測試結果(因為都是8080埠,所以測試的時候記得切換重啟兩個應用哦): 可以看到WebFlux的吞吐幾乎是MVC的翻倍,平均響應時間少了兩倍不止,很明顯,在等待的時候,2000個併發使用者大大超過了我們配置的250個執行緒池的執行緒數量,這個時候只能排隊,對於非阻塞的方式,延遲是不會佔用處理執行緒的,在延遲結束後才會去佔用處理執行緒的資源進行處理,不會收到併發使用者數受限於執行緒池執行緒數的情況。 我們把Sleep相關程式碼註釋再進行一次測試看看情況,分別是MVC和WebFlux: 這個時候WebFlux優勢沒有那麼明顯了。

效能測試二

現在我們來訪問一下http://localhost:8080/saveData?size=100&length=1000 介面往Mongodb來初始化100條資料,然後修改一下測試指令碼壓測dbData介面: class StressTest extends Simulation {

val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/dbData") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }

setUp(scn.inject(atOnceUsers(1000))) } 下面看下這次的測試結果 ,分別是MVC和WebFlux: 吞吐量沒有太多提高,平均響應時間快不少。

效能測試三

再來試一下第三個saveData介面的情況。修改測試程式碼: class StressTest extends Simulation {

val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/saveData?size=5&length=100000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }

setUp(scn.inject(atOnceUsers(200))) } 這裡我們修改併發使用者為200,每個使用者進行100次測試,每次測試存入Mongodb 5條100KB的資料,一次測試後總資料量在10萬條。這次測試我們並沒有使用1000併發使用者,原因是這個測試我們會先從遠端獲取資料然後再存入Mongodb,遠端的服務也是來自於當前應用程式,我們的Tomcat最多隻有250個執行緒,在啟動1000個使用者後,一些執行緒服務於saveData介面,一些執行緒服務於data介面(saveData介面用到的),這樣相當於造成了迴圈依賴問題,請求在等待更多的可用執行緒執行服務data介面的響應,而這個時候執行緒又都被佔了導致無法分配更多的請求,測試幾乎全部超時。 下面看下這次的測試結果 ,分別是MVC和WebFlux:

WebFlux也是併發略高,效能略好的優勢。對於響應時間的分佈我們再來細看下下面的圖:

第一個圖是MVC版本的響應時間分佈,可以看到抖動比第二個圖的WebFlux的大不少。 最後來看看測試過程中MVC的JVM情況(263個執行緒): 以及WebFlux的(41執行緒):

效能測試四:

我們來測試一下下面兩種情況下對於WebFlux版本Mongodb側的情況:

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=1&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}

以及

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=5&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}

區別就在遠端服務返回的Flux是1個還是5個。在1個的時候執行測試可以看到我們Mongodb有64個連線(需要把之前連線池的配置最小設定為小一點,比如50):

> db.serverStatus().connections
{ "current" : 64, "available" : 3212, "totalCreated" : 8899 }

在size為5的時候,Flux返回的是5個物件,使用這個請求壓測的時候Mongodb的連線數如下:

> db.serverStatus().connections
{ "current" : 583, "available" : 2693, "totalCreated" : 10226 }

這是因為Flux拿到的資料直接以響應式進入Mongodb,並沒有等到所有資料拿到之後序列呼叫方法。 總結一下這幾次的測試,我們發現WebFlux方式對於MVC方式能有略微的效能提升,對於請求阻塞的時候效能優勢明顯。我本金的測試並沒有看到現象中的幾倍甚至幾十倍的效能提升,我猜原因如下:

  1. 本機有效能瓶頸了,壓測客戶端、Mongodb伺服器、服務端都在本機執行,干擾因素太多,CPU的使用你爭我奪,測試不公平
  2. 測試的時候CPU永遠是100%還宕機好幾次,我根本無法測試更高的併發,無法完全把非阻塞的效能壓出來
  3. 我本機測試的時候走的是localhost而不是內網,不經過物理網絡卡,可能無法體現非阻塞的效能 如果有條件可以使用三臺獨立伺服器在內網進行1萬以上併發使用者的效能測試或許可以得到更科學的結果。

總結

本文我們建立了WebFlux和MVC兩套應用對比演示了簡單返回資料、發出遠端請求、使用Mongodb等一些簡單的應用場景,然後來看了一下ThreadPerRequest和EventLoop方式執行緒模型的區別,最後使用Gatling進行了幾個Case的壓力測試並且觀察結果。我覺得:

  1. 非阻塞模型肯定是好東西,在IO壓力和IO延遲很大的情況下,非阻塞模型因為不需要更多的執行緒,內耗小,效能略好,而且也穩定,所以更利於高併發
  2. WebFlux的函式式和宣告方式實現需要有很高的API熟悉使用門檻,對於複雜的邏輯這種方式的實現比回撥地獄更容易繞暈,而且容易產生Bug(或許以後有可能響應式的程式設計在API上有可能和傳統方式進行統一)
  3. 目前和WebFlux配套的其它一些Reactive的庫還不是很全面成熟,要對複雜的業務邏輯全面啟用響應式程式設計有點難,阻塞呼叫不是不能在WebFlux中混用,但是這種方式還是採用了執行緒池來處理,現在容器也是NIO的了,有又多大區別
  4. 採用阻塞方式實現,由阻塞的執行緒進行天然背壓進行流控,非阻塞方式很直接一竿子到底,從外部請求直接到最底層儲存,需要做好流控,這是非常容易產生問題的一個點,當請求的處理無需通過執行緒來承載的時候,前端壓力會直通最底層資料來源,不收任何擴容方面的限制,直接擊潰底層
  5. 對於阻塞的方式,多執行緒的排程天然就是一個任務的負載均衡,並不會出現太嚴重的卡死工作執行緒的問題,非阻塞應用程式設計我們要有意識程式碼在哪個執行緒上執行,如果是reactor執行緒的話千萬不能長時間阻塞 綜上所述,使用WebFlux進行響應式程式設計我個人認為目前只適合做類IO轉發的高併發的又看中資源使用效率的應用場景(比如Gateway閘道器服務),對於複雜的業務邏輯不太適合,在90%的情況下響應式的程式設計模型和執行緒模型不會享受大幅效能優勢,更不建議盲目把現有的應用使用WebFlux來重寫。當然,這肯定是一個會持續發展的方向,可以先接觸研究起來。