1. 程式人生 > >講課:Webflux響應式程式設計(SpringBoot 2.0新特性)

講課:Webflux響應式程式設計(SpringBoot 2.0新特性)

  • 學習webflux前需要學習三個基礎:
  1. 函數語言程式設計和lambda表示式
  2. Stream流程式設計
  3. Reactive stream 響應式流
  • 接下來進入學習

一、函數語言程式設計和lambda表示式

1. 什麼是函數語言程式設計
函數語言程式設計是一種相對於指令式程式設計的一種程式設計正規化,它不是一種具體的技術,而是一種如何搭建應用程式的方法論

2. 為什麼要使用函數語言程式設計
* 能讓我們以一種更加優雅的方式進行程式設計
* 函數語言程式設計與指令式程式設計相比
1)不同點:
關注點不同,指令式程式設計我們關注的是怎麼樣做,而函數語言程式設計關注的是做什麼。
2)優點:
可以使程式碼更加的簡短,更加的好讀。

3. lambda表示式初接觸
具體看一個例子,求陣列中的最大值,如果資料量太大,想要處理更高效,jdk8以前,只能自己建立執行緒池,自己拆分,而jdk8以後只需要加上parallel(),意思就是告訴它我要多執行緒的處理該資料,以此可以看到他的魅力

public class MinDemo {
    public static void main(String[] args) {
        int[] arr = {15,24,12,451,156};
        int min = Integer.MAX_VALUE;
        for (int a :
                arr) {
            if (a < min) {
                min = a;
            }
        }
        System.out.println(min);

        //jdk8 lambda,parallel()多執行緒處理
        int min2 = IntStream.of(arr).parallel().min().getAsInt();
        System.out.println(min2);

    }

4. 當然還有很多其他的特性,這裡只簡單介紹一下

  • jdk8介面新特性

    1. 接口裡只有一個要實現的方法,單一責任制
    2. 新增預設方法
  • 函式介面

    1. 只需要知道輸入輸出的型別
    2. 支援鏈式操作
  • 方法引用

    1. 靜態方法引用
    2. 非靜態方法引用
    3. 構造方法引用
  • 級聯表示式和柯里化

    1. 級聯表示式是返回函式的函式
    2. 柯里化把多個引數的函式轉換為只有一個引數的函式
  • 變數引用

    1. 引用外邊的變數必須是final型別

5. 以下是函數語言程式設計常用的介面
在這裡插入圖片描述

二、 Stream流程式設計

1. 是什麼,不是什麼
是一個高階的迭代器,不是一個數據結構、不是一個集合、不會存放資料、關注的是怎麼把資料高效處理
2. 建立/中間操作/終止操作


1) 建立
在這裡插入圖片描述

程式碼演示

		List<String> list = new ArrayList<>();

		// 從集合建立
		list.stream();
		list.parallelStream();

		// 從陣列建立
		Arrays.stream(new int[] { 2, 3, 5 });

		// 建立數字流
		IntStream.of(1, 2, 3);
		IntStream.rangeClosed(1, 10);

		// 使用random建立一個無限流
		new Random().ints().limit(10);
		Random random = new Random();

		// 自己產生流
		Stream.generate(() -> random.nextInt()).limit(20);

2) 中間操作
在這裡插入圖片描述

		String str = "my name is AlgerFan";

		System.out.println("--------------filter------------");
		// 把每個單詞的長度調用出來
		Stream.of(str.split(" ")).filter(s -> s.length() > 2)
				.map(String::length).forEach(System.out::println);

		System.out.println("--------------flatMap------------");
		// flatMap A->B屬性(是個集合), 最終得到所有的A元素裡面的所有B屬性集合
		// intStream/longStream 並不是Stream的子類, 所以要進行裝箱 boxed
		Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
				.forEach(i -> System.out.println((char) i.intValue()));

		System.out.println("--------------peek------------");
		// peek 用於debug. 是個中間操作,和 forEach 是終止操作
		Stream.of(str.split(" ")).peek(System.out::println)
				.forEach(System.out::println);

		System.out.println("--------------limit------------");
		// limit 使用, 主要用於無限流
		new Random().ints().filter(i -> i > 100 && i < 1000).limit(5)
				.forEach(System.out::println);

3) 終止操作
在這裡插入圖片描述

		String str = "my name is AlgerFan";

		System.out.println("-------並行流parallel--------");
		// 使用並行流
		str.chars().parallel().forEach(i -> System.out.print((char) i));
		System.out.println();
		// 使用 forEachOrdered 保證順序
		str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
		System.out.println();

		System.out.println("-------collect收集到list--------");
		// 收集到list
		List<String> list = Stream.of(str.split(" "))
				.collect(Collectors.toList());
		System.out.println(list);

		System.out.println("-------使用 reduce 拼接字串--------");
		// 使用 reduce 拼接字串
		Optional<String> letters = Stream.of(str.split(" "))
				.reduce((s1, s2) -> s1 + "|" + s2);
		System.out.println(letters.orElse(""));

		System.out.println("-------帶初始化值的reduce--------");
		// 帶初始化值的reduce
		String reduce = Stream.of(str.split(" ")).reduce("",
				(s1, s2) -> s1 + "|" + s2);
		System.out.println(reduce);

		System.out.println("-------計算所有單詞總長度--------");
		// 計算所有單詞總長度
		Integer length = Stream.of(str.split(" ")).map(s -> s.length())
				.reduce(0, (s1, s2) -> s1 + s2);
		System.out.println(length);

		System.out.println("-------max 的使用--------");
		// max 的使用
		Optional<String> max = Stream.of(str.split(" "))
				.max((s1, s2) -> s1.length() - s2.length());
		System.out.println(max.get());

		System.out.println("-------使用 findFirst 短路操作--------");
		// 使用 findFirst 短路操作
		OptionalInt findFirst = new Random().ints().findFirst();
		System.out.println(findFirst.getAsInt());
  1. 並行流
    以上已經接觸了parallel()並行流,能夠多執行緒的處理資料
  2. 收集器
    示例程式碼:
		// 測試資料
		List<Student> students = Arrays.asList(
				new Student("小明", 10, Gender.MALE, Grade.ONE),
				new Student("大明", 9, Gender.MALE, Grade.THREE),
				new Student("小白", 8, Gender.FEMALE, Grade.TWO),
				new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
				new Student("小紅", 7, Gender.FEMALE, Grade.THREE),
				new Student("小黃", 13, Gender.MALE, Grade.ONE),
				new Student("小青", 13, Gender.FEMALE, Grade.THREE),
				new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
				new Student("小王", 6, Gender.MALE, Grade.ONE),
				new Student("小李", 6, Gender.MALE, Grade.ONE),
				new Student("小馬", 14, Gender.FEMALE, Grade.FOUR),
				new Student("小劉", 13, Gender.MALE, Grade.FOUR));

		// 得到所有學生的年齡列表
		// s -> s.getAge() --> Student::getAge , 不會多生成一個類似 lambda$0這樣的函式
		Set<Integer> ages = students.stream().map(Student::getAge)
				.collect(Collectors.toCollection(TreeSet::new));
		System.out.println("所有學生的年齡:" + ages);

		// 統計彙總資訊
		IntSummaryStatistics agesSummaryStatistics = students.stream()
				.collect(Collectors.summarizingInt(Student::getAge));
		System.out.println("年齡彙總資訊:" + agesSummaryStatistics);

		// 分塊
		Map<Boolean, List<Student>> genders = students.stream().collect(
				Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
		System.out.println("男女學生列表:" + genders);

		// 分組
		Map<Grade, List<Student>> grades = students.stream()
				.collect(Collectors.groupingBy(Student::getGrade));
		System.out.println("學生班級列表:" + grades);

		// 得到所有班級學生的個數
		Map<Grade, Long> gradesCount = students.stream().collect(Collectors
				.groupingBy(Student::getGrade, Collectors.counting()));
		System.out.println("班級學生個數列表:" + gradesCount);

測試結果

所有學生的年齡:[6, 7, 8, 9, 10, 13, 14]
年齡彙總資訊:IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女學生列表:{false=[[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小紅, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE], [name=小紫, age=9, gender=FEMALE, grade=TWO], [name=小馬, age=14, gender=FEMALE, grade=FOUR]], true=[[name=小明, age=10, gender=MALE, grade=ONE], [name=大明, age=9, gender=MALE, grade=THREE], [name=小黃, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE], [name=小劉, age=13, gender=MALE, grade=FOUR]]}
學生班級列表:{FOUR=[[name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小馬, age=14, gender=FEMALE, grade=FOUR], [name=小劉, age=13, gender=MALE, grade=FOUR]], ONE=[[name=小明, age=10, gender=MALE, grade=ONE], [name=小黃, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE]], THREE=[[name=大明, age=9, gender=MALE, grade=THREE], [name=小紅, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE]], TWO=[[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小紫, age=9, gender=FEMALE, grade=TWO]]}
班級學生個數列表:{FOUR=3, ONE=4, THREE=3, TWO=2}
  1. 執行機制
    演示一個測試程式碼
		Random random = new Random();
		// 隨機產生資料
		Stream<Integer> stream = Stream.generate(random::nextInt)
				// 產生300個 ( 無限流需要短路操作. )
				.limit(300)
				// 第1個無狀態操作,print(s)執行耗時操作5s
				.peek(s -> print("peek: " + s))
				// 第2個無狀態操作
				.filter(s -> {
					print("filter: " + s);
					return s > 1000000;
				})
				// 有狀態操作
				/*.sorted((i1, i2) -> {
					print("排序: " + i1 + ", " + i2);
					return i1.compareTo(i2);
				})*/
				// 又一個無狀態操作
				.peek(s -> {
					print("peek2: " + s);
				});

		// 終止操作
		stream.count();

分析以上程式碼,發現Stream建立了一個256長度的陣列

  1. 所有操作是鏈式呼叫, 一個元素只迭代一次
  2. 每一箇中間操作返回一個新的流. 流裡面有一個屬性sourceStage
    指向同一個 地方,就是Head
  3. Head->nextStage->nextStage->… -> null
  4. 有狀態操作會把無狀態操作階段,單獨處理
  5. 並行環境下, 有狀態的中間操作不一定能並行操作.
  6. parallel/ sequetial 這2個操作也是中間操作(也是返回stream)
    但是他們不建立流, 他們只修改 Head的並行標誌

三、Reactive stream 響應式流

  • Reactive stream是jdk9新特性,提供了一套API,就是一種訂閱釋出者模式
  • 被壓,背壓是指在非同步場景中,釋出者傳送事件速度遠快於訂閱者的處理速度的情況下,一種告訴上游的釋出者降低傳送速度的策略,簡而言之,背壓就是一種流速控制的策略。
    舉個例子:假設以前是沒有水龍頭的,只能自來水廠主動的往使用者輸送水,但是不知道使用者需要多少水,有了Reactive stream,就相當於有了水龍頭,使用者可以主動的請求用水,而自來水廠也知道了使用者的需求
    示例程式碼(需要jdk9以上版本的支援)
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {
    public static void main(String[] args) throws Exception {
        // 1. 定義釋出者, 釋出的資料型別是 Integer
        // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 介面
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
        
        // 2. 定義訂閱者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 儲存訂閱關係, 需要用它來給釋出者響應
                this.subscription = subscription;
                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個數據, 處理
                System.out.println("接受到資料: " + item);
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 處理完呼叫request再請求一個數據
                this.subscription.request(1);
                // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理資料的時候產生了異常)
                throwable.printStackTrace();
                // 我們可以告訴釋出者, 後面不接受資料了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部資料處理完了(釋出者關閉了)
                System.out.println("處理完了!");
            }
        };

        // 3. 釋出者和訂閱者 建立訂閱關係
        publiser.subscribe(subscriber);

        // 4. 生產資料, 併發布
        // 這裡忽略資料生產過程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成資料:" + i);
            // submit是個block方法
            publiser.submit(i);
        }

        publiser.submit(111);
        publiser.submit(222);
        publiser.submit(333);

        // 5. 結束後 關閉釋出者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主執行緒延遲停止, 否則資料沒有消費就退出
        Thread.currentThread().join(1000);
    }
}

四、Webflux響應式程式設計

先來一張圖,這是spring文件的一張截圖,介紹了spring如今的兩種開發模式,MVC和webflux兩種開發模式,可見webflux的重要性

在這裡插入圖片描述

1. 初識SpringWebFlux
webflux 是spring5推出的一種響應式Web框架,它是一種非阻塞的開發模式,可以在一個執行緒裡處理多個請求(非阻塞),執行在netty環境,也可以可以執行在servlet3.1之後的容器,支援非同步servlet, 可以支援更高的併發量

2. 非同步servlet

  • 我們知道同步servlet阻塞了Tomcat容器的執行緒,當一個網路請求到我們的Tomcat容器之後,容器會給每個請求啟動一個執行緒去處理,執行緒裡面會呼叫一個servlet去處理,當使用同步servlet時,業務程式碼花多長時間,你的執行緒就要等待多長時間,這就是堵塞(同步和非同步是伺服器後臺才有非同步這個概念,對於瀏覽器來說所有的請求都是非同步,前臺都要花費業務邏輯時間)
  • 非同步servlet的主要作用是它不會堵塞Tomcat容器的servlet執行緒,它可以把一些耗時的操作放在一個獨立的執行緒池,那麼我們的servlet就可以立馬返回,處理下一個請求,以此就可以達到高併發。
    通過程式碼比較一下同步servlet與非同步servlet

同步servlet

@WebServlet(urlPatterns = "/SyncServlet")
public class SyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public SyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();
        // 執行業務程式碼
        doSomeThing(request, response);
        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(HttpServletRequest request,
                             HttpServletResponse response) throws IOException {
        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        response.getWriter().append("done");
    }
}

非同步servlet

@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public AsyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 開啟非同步
        AsyncContext asyncContext = request.startAsync();

        // 執行業務程式碼,放入一個執行緒池裡
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(AsyncContext asyncContext,
                             ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 業務程式碼處理完畢, 通知結束
        asyncContext.complete();
    }
}

3. CRUD完整示例

  • 通過下圖可以看出MVC和wenflux的區別
    在這裡插入圖片描述

  • 以下通過一個例子瞭解一下webflux開發

  1. 實體類
@Document(collection = "user")
@Data
public class User {

	@Id
	private String id;

	@NotBlank
	private String name;

	@Range(min=10, max=100)
	private int age;

}
  1. Controller層
@RestController
@RequestMapping("/user")
public class UserController {

	private final UserRepository repository;

	public UserController(UserRepository repository) {
		this.repository = repository;
	}

	/**
	 * 以陣列形式一次性返回資料
	 */
	@GetMapping("/")
	public Flux<User> getAll() {
		return repository.findAll();
	}

	/**
	 * 以SSE形式多次返回資料
	 */
	@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamGetAll() {
		return repository.findAll();
	}

	/**
	 * 新增資料
	 */
	@PostMapping("/")
	public Mono<User> createUser(@Valid @RequestBody User user) {
		// spring data jpa 裡面, 新增和修改都是save. 有id是修改, id為空是新增
		// 根據實際情況是否置空id
		user.setId(null);
		CheckUtil.checkName(user.getName());
		return this.repository.save(user);
	}

	/**
	 * 根據id刪除使用者 存在的時候返回200, 不存在返回404
	 */
	@DeleteMapping("/{id}")
	public Mono<ResponseEntity<Void>> deleteUser(
			@PathVariable("id") String id) {
		// deletebyID 沒有返回值, 不能判斷資料是否存在
		// this.repository.deleteById(id)
		return this.repository.findById(id)
				// 當你要操作資料, 並返回一個Mono 這個時候使用flatMap
				// 如果不操作資料, 只是轉換資料, 使用map
				.flatMap(user -> this.repository.delete(user).then(
						Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 修改資料 存在的時候返回200 和修改後的資料, 不存在的時候返回404
	 */
	@PutMapping("/{id}")
	public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
			@Valid @RequestBody User user) {
		CheckUtil.checkName(user.getName());
		return this.repository.findById(id)
				// flatMap 操作資料
				.flatMap(u -> {
					u.setAge(user.getAge());
					u.setName(user.getName());
					return this.repository.save(u);
				})
				// map: 轉換資料
				.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 根據ID查詢使用者 存在返回使用者資訊, 不存在返回404
	 */
	@GetMapping("/{id}")
	public Mono<ResponseEntity<User>> findUserById(
			@PathVariable("id") String id) {
		return this.repository.findById(id)
				.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 根據年齡查詢使用者
	 */
	@GetMapping("/age/{start}/{end}")
	public Flux<User> findByAge(@PathVariable("start") int start,
			@PathVariable("end") int end) {
		return this.repository.findByAgeBetween(start, end);
	}

	/**
	 * 根據年齡查詢使用者
	 */
	@GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamFindByAge(@PathVariable("start") int start,
			@PathVariable("end") int end) {
		return this.repository.findByAgeBetween(start, end);
	}
	
	/**
	 *  得到20-30使用者
	 */
	@GetMapping("/old")

	public Flux<User> oldUser() {
		return this.repository.oldUser();
	}

	/**
	 * 得到20-30使用者
	 */
	@GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamOldUser() {
		return this.repository.oldUser();
	}

}
  1. Repository層
@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {

	/**
	 * 根據年齡查詢使用者
	 */
	Flux<User> findByAgeBetween(int start, int end);
	
	@Query("{'age':{ '$gte': 20, '$lte' : 30}}")
	Flux<User> oldUser();
}
  • 以上程式碼沒有進行校驗,當然沒有校驗的程式碼是不能用的,校驗程式碼我就不放了,想了解的GitHub上有完整程式碼。