1. 程式人生 > >實戰基於Spring Boot 2的WebFlux和mLab搭建反應式Web

實戰基於Spring Boot 2的WebFlux和mLab搭建反應式Web

Spring Framework 5帶來了新的Reactive Stack非阻塞式Web框架:Spring WebFlux。作為與Spring MVC並行使用的Web框架,Spring WebFlux依賴了反應式流介面卡(Reactive Streams Adapter),在Netty和Servlet3.1的容器下,可以提供非阻塞式的Web服務,充分發揮下一代多核處理器的優勢,支撐海量的併發訪問。

以上是官網的介紹,事實上在基於Spring Boot 2強大的微服務架構幫助下,WebFlux和Spring MVC一起,成為Java應用開發的兩大選擇,可以讓我們迅速地搭建起反應式的Web應用。本文擬通過模擬一個簡單的微博應用,實戰通過Spring Boot 2+ Spring WebFlux + MongoDB 開發一個Web應用。

Spring WebFlux及其程式設計正規化

Spring WebFlux通過核心庫Reactor提供反應式支援,Reactor實現了Reactive Streams,後者是一個帶非阻塞式背壓的非同步流處理器。

Reactor包含兩個重要的成員變數FluxMono,它們都實現了Reactive Streams提供的Publisher介面Flux 是一個代表了0..N元素的流,Mono是代表了一個0..1元素的流。雖然WebFlux使用Reactor作為它的核心依賴,它在應用層面,它也同時支援RxJava。

Spring WebFlux支援兩種型別的程式設計正規化:

  1. 傳統的基於註解的方式,如@Controller、@RequestMapping等沿用了Spring MVC的模式.

  2. 基於Java8的Lambda函數語言程式設計模式

本文主要是使用基於註解的方式,今後另文補充基於函數語言程式設計的正規化。

基於Spring Boot 2+ Spring WebFlux + MongoDB的輕量級微博應用

以下展示如何搭建一個輕量級的微博應用,這個應用只包括一個domain類Tweet,使用基於MongoDB的線上MongoDB資料庫mLab作為儲存,並且使用非同步的RESTful API提供基本的增刪查改功能。

此外還會用到Spring Test元件,通過使用Maven的外掛功能,實現對微服務應用的測試。

1. 新建專案

  1. 選擇2.x以上的Spring Boot版本
  2. 輸入artifact的值,比如webflux-demo
  3. 選擇Reactive Web和Reactive MongoDB依賴
  4. 點選Generate Project,生成並下載一個微服務框架到本地,並解壓
  5. 使用IDE,比如eclipse,匯入解壓出來的專案檔案

2. 註冊mLab賬戶,並新建一個MongoDB資料庫

MongoDB資料庫是常用的文件型別資料庫,廣泛用於社交網站、電商等引用中。而mLab是一個線上MongoDB資料庫平臺,提供MongoDB的線上服務。這個應用使用到它。

  1. 根據要求註冊賬戶
  2. 網站會有免費和收費的服務選擇,選擇AWS的免費MongoDB服務
  3. 服務選擇完畢,平臺會提供一個數據庫映象,可以點選資料庫前往管理頁面。
  4. 在User標籤下,新建資料庫的登入名和密碼。

完成以上步驟,資料庫就可以開始使用了。你會看到如下圖所示的頁面:

3. 在專案中配置MongoDB資料庫

前往IDE中的專案資原始檔夾,找到application.properties。新增你在mLad的MongoDB URI

spring.data.mongodb.uri=mongodb://username:[email protected]:63439/springdb   

在應用啟動的時候,Springboot會自動讀取該配置檔案。

4. 編寫應用各模組

WebFlux可以認為是基於Spring的Web開發的一個新的模式或選擇,因此它既有Spring MVC有的模組如Domain、Controller、Service,也有新增的如Handler、Router等。下面分別編寫各模組。

4.1 Domain包

Domain包只包括一個domain類Tweet.java,因為使用了文件資料庫,因此使用@Document註解修飾類,並且使用@Id修飾成員變數id。程式碼如下:

 1 package com.example.webfluxdemo.model;
 2 
 3 import java.util.Date;
 4 
 5 import javax.validation.constraints.NotBlank;
 6 import javax.validation.constraints.NotNull;
 7 import javax.validation.constraints.Size;
 8 
 9 import org.springframework.data.annotation.Id;
10 import org.springframework.data.mongodb.core.mapping.Document;
11 
12 @Document(collection = "tweets")
13 public class Tweet {
14     @Id
15     private String id;
16     
17     @NotBlank
18     @Size(max = 140)
19     private String text;
20     
21     @NotNull
22     private Date createAt = new Date();
23     
24     public Tweet() {
25         
26     }
27 
28     public Tweet(String text) {
29         this.text = text;
30     }
31 
32     public String getId() {
33         return id;
34     }
35 
36     public void setId(String id) {
37         this.id = id;
38     }
39 
40     public String getText() {
41         return text;
42     }
43 
44     public void setText(String text) {
45         this.text = text;
46     }
47 
48     public Date getCreateAt() {
49         return createAt;
50     }
51 
52     public void setCreateAt(Date createAt) {
53         this.createAt = createAt;
54     }
55     
56 }

4.2 Repository

Repository介面是DAO,繼承了ReactiveMongoRepository介面用於連線MongoDB資料庫做資料持久化,

 1 package com.example.webfluxdemo.repository;
 2 
 3 import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
 4 import org.springframework.stereotype.Repository;
 5 
 6 import com.example.webfluxdemo.model.Tweet;
 7 
 8 @Repository
 9 public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> {
10 
11 }

通過檢視原始碼可知,父介面ReactiveMongoRepository包含對MongoDB資料庫基本的增刪改查方法。在執行時,Spring Boot會自動實現一個SimpleReactiveMongoRepository類,用於執行增刪改查方法。這樣極大地節省了程式設計師持久化的精力,可以專注於業務開發。

4.3 Controller

Controller是WebFlux的核心類,程式碼如下:

 1 package com.example.webfluxdemo.controller;
 2 
 3 import javax.validation.Valid;
 4 
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.http.HttpStatus;
 7 import org.springframework.http.MediaType;
 8 import org.springframework.http.ResponseEntity;
 9 import org.springframework.web.bind.annotation.DeleteMapping;
10 import org.springframework.web.bind.annotation.GetMapping;
11 import org.springframework.web.bind.annotation.PathVariable;
12 import org.springframework.web.bind.annotation.PostMapping;
13 import org.springframework.web.bind.annotation.PutMapping;
14 import org.springframework.web.bind.annotation.RequestBody;
15 import org.springframework.web.bind.annotation.RestController;
16 
17 import com.example.webfluxdemo.model.Tweet;
18 import com.example.webfluxdemo.repository.TweetRepository;
19 
20 import reactor.core.publisher.Flux;
21 import reactor.core.publisher.Mono;
22 
23 @RestController
24 public class TweetController {
25     
26     @Autowired
27     private TweetRepository tweetRepository;
28     
29     @GetMapping("/tweets")
30     public Flux<Tweet> getAllTweets(){
31         return tweetRepository.findAll();
32     }
33     
34     @PostMapping("/tweets")
35     public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet){
36         return tweetRepository.save(tweet);
37     }
38     
39     @GetMapping("/tweets/{id}")
40     public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) {
41         return tweetRepository.findById(tweetId)
42                 .map(savedTweet -> ResponseEntity.ok(savedTweet))
43                 .defaultIfEmpty(ResponseEntity.notFound().build());
44     }
45 
46     @PutMapping("/tweets/{id}")
47     public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId,
48                                                    @Valid @RequestBody Tweet tweet) {
49         return tweetRepository.findById(tweetId)
50                 .flatMap(existingTweet -> {
51                     existingTweet.setText(tweet.getText());
52                     return tweetRepository.save(existingTweet);
53                 })
54                 .map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK))
55                 .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
56     }
57 
58     @DeleteMapping("/tweets/{id}")
59     public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) {
60 
61         return tweetRepository.findById(tweetId)
62                 .flatMap(existingTweet ->
63                         tweetRepository.delete(existingTweet)
64                             .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
65                 )
66                 .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
67     }
68 
69     // 基於反應式流傳送微博至客戶端
70     @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
71     public Flux<Tweet> streamAllTweets() {
72         return tweetRepository.findAll();
73     }
74 
75 }

Controller使用Flux或Mono作為物件,返回給不同的請求。反應式編碼主要在最後一個方法:

// 基於反應式流傳送微博至客戶端
    @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Tweet> streamAllTweets() {
        return tweetRepository.findAll();
    }  

這個方法和getAllTweet方法一樣,會返回一個JSON流到客戶端,區別在於streamAllTweets以Server-send-event的方式返回一個Json流到瀏覽器,這種流可以被瀏覽器識別和使用。

使用WebTestClient測試應用

WebTestClient是Spring 5提供的一個非同步反應式Http客戶端,可以用於測試反應式的RestFul微服務應用。在IDE的測試資料夾中,可以找到測試類,編寫程式碼如下:

 1 package com.example.webfluxdemo;
 2 
 3 import java.util.Collections;
 4 
 5 import org.assertj.core.api.Assertions;
 6 import org.junit.Test;
 7 import org.junit.runner.RunWith;
 8 import org.springframework.beans.factory.annotation.Autowired;
 9 import org.springframework.boot.test.context.SpringBootTest;
10 import org.springframework.http.MediaType;
11 import org.springframework.test.context.junit4.SpringRunner;
12 import org.springframework.test.web.reactive.server.WebTestClient;
13 
14 import com.example.webfluxdemo.model.Tweet;
15 import com.example.webfluxdemo.repository.TweetRepository;
16 
17 import reactor.core.publisher.Mono;
18 
19 @RunWith(SpringRunner.class)
20 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
21 public class WebfluxDemoApplicationTests {
22 
23     @Autowired
24     private WebTestClient webTestClient;
25     
26     @Autowired
27     TweetRepository tweetRepository;
28     
29     @Test
30     public void testCreateTweet() {
31         Tweet tweet = new Tweet("這是一條測試微博");
32 
33         webTestClient.post().uri("/tweets")
34                 .contentType(MediaType.APPLICATION_JSON_UTF8)
35                 .accept(MediaType.APPLICATION_JSON_UTF8)
36                 .body(Mono.just(tweet), Tweet.class)
37                 .exchange()
38                 .expectStatus().isOk()
39                 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
40                 .expectBody()
41                 .jsonPath("$.id").isNotEmpty()
42                 .jsonPath("$.text").isEqualTo("這是一條測試微博");
43     }
44     
45     @Test
46     public void testGetAllTweets() {
47         webTestClient.get().uri("/tweets")
48                 .accept(MediaType.APPLICATION_JSON_UTF8)
49                 .exchange()
50                 .expectStatus().isOk()
51                 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
52                 .expectBodyList(Tweet.class);
53     }
54 
55     @Test
56     public void testGetSingleTweet() {
57         Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block();
58 
59         webTestClient.get()
60                 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
61                 .exchange()
62                 .expectStatus().isOk()
63                 .expectBody()
64                 .consumeWith(response ->
65                         Assertions.assertThat(response.getResponseBody()).isNotNull());
66     }
67 
68     @Test
69     public void testUpdateTweet() {
70         Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block();
71 
72         Tweet newTweetData = new Tweet("更新微博");
73 
74         webTestClient.put()
75                 .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
76                 .contentType(MediaType.APPLICATION_JSON_UTF8)
77                 .accept(MediaType.APPLICATION_JSON_UTF8)
78                 .body(Mono.just(newTweetData), Tweet.class)
79                 .exchange()
80                 .expectStatus().isOk()
81                 .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
82                 .expectBody()
83                 .jsonPath("$.text").isEqualTo("更新微博");
84     }
85 
86     @Test
87     public void testDeleteTweet() {
88         Tweet tweet = tweetRepository.save(new Tweet("將要被刪除的微博")).block();
89 
90         webTestClient.delete()
91                 .uri("/tweets/{id}", Collections.singletonMap("id",  tweet.getId()))
92                 .exchange()
93                 .expectStatus().isOk();
94     }
95 
96 }

使用mvn test命令,測試所有的測試類。結果如下:

檢視mLab的資料庫,資料被成功新增: