WebFlux基礎之響應式程式設計
上篇文章,我們簡單的瞭解了WebFlux的一些基礎與背景,並通過示例來寫了一個demo。我們知道WebFlux是響應式的web框架,其特點之一就是可以通過函數語言程式設計方式配置route。另外究竟什麼是響應式程式設計呢?這篇文章我們就簡單探討一下
一、Java8中的函數語言程式設計
百科中這樣定義函數語言程式設計:
函數語言程式設計是種程式設計方式,它將電腦運算視為函式的計算。函式程式語言最重要的基礎是λ演算(lambda calculus),而且λ演算的函式可以接受函式當作輸入(引數)和輸出(返回值)。那麼在Java8裡怎麼樣來實現它呢?
示例一
在這裡我先自己寫一個例子
定義介面:
package com.bdqn.lyrk.basic.java; /** * 函式式介面 * * @author chen.nie * @date 2018/7/18 **/ @FunctionalInterface public interface OperateNumberFunctions { void operate(Integer number); default void print() { } }
在定義的介面上新增@FunctionalInterface表明其是函式式介面,這個註解用於檢測函式式介面規範,定義函式式介面時該介面內必須有且只有一個抽象的方法。
定義類:
package com.bdqn.lyrk.basic.java; import java.util.Optional; import java.util.function.Predicate; /** * 定義函數語言程式設計類 */ public class NumberFunctions { private Integer number; private NumberFunctions() { } private static NumberFunctions numberFunctions = newNumberFunctions(); public static NumberFunctions of(Integer number) { numberFunctions.number = number; return numberFunctions; } public NumberFunctions add(Integer number) { numberFunctions.number += number; return numberFunctions; } public NumberFunctions subtraction(Integer number) { numberFunctions.number -= number; return numberFunctions; } public Optional<NumberFunctions> filter(Predicate<Integer> predicate) { if (predicate.test(this.number)) return Optional.of(numberFunctions); return Optional.ofNullable(new NumberFunctions()); } public void operate(OperateNumberFunctions functions) { functions.operate(this.number); } }
在這裡定義類進行簡單的運算與過濾條件。那麼在Main方法裡可以這麼寫:
package com.bdqn.lyrk.basic.java; public class Main { public static void main(String[] args) { NumberFunctions.of(10).add(30).subtraction(2).filter(number -> number>20).get().operate(System.out::println); } }
那麼輸出結果為38
示例二
在Java8裡有一個類叫Stream。Stream是資料流的意思,這個類略微有點像Reactor中Flux,它提供了類似於操作符的功能,我們來看一個例子:
Main方法
package com.bdqn.lyrk.basic.java; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; public class Main { public static void main(String[] args) { /* 在這裡先將Stream裡的內容做乘2的操作 然後在進行倒序排序 緊接著過濾出是4的倍數的數字 然後轉換成集合在列印 */ Stream.of(15, 26, 34, 455, 5, 6).map(number -> number * 2).sorted((num1, num2) -> num2 - num1).filter(integer -> integer % 4 == 0).collect(toList()).forEach(System.out::println); } }
執行得到的結果:
68 52 12
關於::操作符
該操作符是lambda表示式的更特殊寫法,使用此操作符可以簡化函式式介面的實現,這個方法至少滿足以下特定條件:
1)方法返回值與函式式介面相同
2)方法引數與函式式介面相同
舉例說明
package java.util.function; /** * Represents a supplier of results. * * <p>There is no requirement that a new or distinct result be returned each * time the supplier is invoked. * * <p>This is a <a href="package-summary.html">functional interface</a> * whose functional method is {@link #get()}. * * @param <T> the type of results supplied by this supplier * * @since 1.8 */ @FunctionalInterface public interface Supplier<T> { /** * Gets a result. * * @return a result */ T get(); }
java中Runnable介面:
@FunctionalInterface public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }
java中的Predicate介面:
package java.util.function; import java.util.Objects; /** * Represents a predicate (boolean-valued function) of one argument. * * <p>This is a <a href="package-summary.html">functional interface</a> * whose functional method is {@link #test(Object)}. * * @param <T> the type of the input to the predicate * * @since 1.8 */ @FunctionalInterface public interface Predicate<T> { /** * Evaluates this predicate on the given argument. * * @param t the input argument * @return {@code true} if the input argument matches the predicate, * otherwise {@code false} */ boolean test(T t); /** * Returns a composed predicate that represents a short-circuiting logical * AND of this predicate and another. When evaluating the composed * predicate, if this predicate is {@code false}, then the {@code other} * predicate is not evaluated. * * <p>Any exceptions thrown during evaluation of either predicate are relayed * to the caller; if evaluation of this predicate throws an exception, the * {@code other} predicate will not be evaluated. * * @param other a predicate that will be logically-ANDed with this * predicate * @return a composed predicate that represents the short-circuiting logical * AND of this predicate and the {@code other} predicate * @throws NullPointerException if other is null */ default Predicate<T> and(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) && other.test(t); } /** * Returns a predicate that represents the logical negation of this * predicate. * * @return a predicate that represents the logical negation of this * predicate */ default Predicate<T> negate() { return (t) -> !test(t); } /** * Returns a composed predicate that represents a short-circuiting logical * OR of this predicate and another. When evaluating the composed * predicate, if this predicate is {@code true}, then the {@code other} * predicate is not evaluated. * * <p>Any exceptions thrown during evaluation of either predicate are relayed * to the caller; if evaluation of this predicate throws an exception, the * {@code other} predicate will not be evaluated. * * @param other a predicate that will be logically-ORed with this * predicate * @return a composed predicate that represents the short-circuiting logical * OR of this predicate and the {@code other} predicate * @throws NullPointerException if other is null */ default Predicate<T> or(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) || other.test(t); } /** * Returns a predicate that tests if two arguments are equal according * to {@link Objects#equals(Object, Object)}. * * @param <T> the type of arguments to the predicate * @param targetRef the object reference with which to compare for equality, * which may be {@code null} * @return a predicate that tests if two arguments are equal according * to {@link Objects#equals(Object, Object)} */ static <T> Predicate<T> isEqual(Object targetRef) { return (null == targetRef) ? Objects::isNull : object -> targetRef.equals(object); } }
那麼上述的介面分別可以使用如下寫法,注意實現該介面的方法特點
package com.bdqn.lyrk.basic.java; import java.util.function.Predicate; import java.util.function.Supplier; public class Main { private static int i; public static void main(String[] args) { /* 建立物件的方式 */ Supplier<Object> supplier = Object::new; /* 呼叫方法的方式(無引數) */ Runnable runnable = Main::add; /* 呼叫方法的方式(有引數) */ Predicate<String> predicate = Main::filter; } public static void add() { i++; System.out.println("test" + i); } public static boolean filter(String test) { return test != null; } }
我們可以看到使用函數語言程式設計藉助於lambda表示式,使得程式碼更簡潔清爽
二、Java中的響應式程式設計
關於響應式程式設計,百度百科是這麼定義的:
簡稱RP(Reactive Programming)
響應式程式設計是一種面向資料流和變化傳播的程式設計正規化。這意味著可以在程式語言中很方便地表達靜態或動態的資料流,而相關的計算模型會自動將變化的值通過資料流進行傳播。 在這裡有兩個關鍵詞:資料流與變化傳播。下面我們來通過程式碼來演示下響應式程式設計是怎麼回事Java8及以前版本
最典型的示例就是,JDK提供的觀察者模式類Observer與Observalbe:
package com.hzgj.lyrk.demo; import java.util.Observable; public class ObserverDemo extends Observable { public static void main(String[] args) { ObserverDemo observable = new ObserverDemo(); observable.addObserver((o, arg) -> { System.out.println("發生變化"); }); observable.addObserver((o, arg) -> { System.out.println("收到被觀察者通知,準備改變"); }); observable.setChanged(); observable.notifyObservers(); } }
在上述程式碼示例中觀察者並沒有及時執行,而是在接受到被觀察者傳送訊號的時候才有了“響應”。其中setChanged()與notifyObservers方法就對應響應式程式設計中定義的關鍵詞--變化與傳播。還有一個典型的示例就是Swing中的事件機制,有興趣的朋友可以下去查閱相關資料,在這裡就不再進行闡述。
Java9及其後版本
從java9開始,Observer與Observable已經被標記為過時的類了,取而代之的是Flow類。Flow才是真正意義上的響應式程式設計類,因為觀察者Observer與Observable雖然能夠響應,但是在資料流的體現並不是特別突出。Flow這個類,我們可以先看一下:
public final class Flow { private Flow() {} // uninstantiable /** * A producer of items (and related control messages) received by * Subscribers. Each current {@link Subscriber} receives the same * items (via method {@code onNext}) in the same order, unless * drops or errors are encountered. If a Publisher encounters an * error that does not allow items to be issued to a Subscriber, * that Subscriber receives {@code onError}, and then receives no * further messages. Otherwise, when it is known that no further * messages will be issued to it, a subscriber receives {@code * onComplete}. Publishers ensure that Subscriber method * invocations for each subscription are strictly ordered in <a * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a> * order. * * <p>Publishers may vary in policy about whether drops (failures * to issue an item because of resource limitations) are treated * as unrecoverable errors. Publishers may also vary about * whether Subscribers receive items that were produced or * available before they subscribed. * * @param <T> the published item type */ @FunctionalInterface public static interface Publisher<T> { /** * Adds the given Subscriber if possible. If already * subscribed, or the attempt to subscribe fails due to policy * violations or errors, the Subscriber's {@code onError} * method is invoked with an {@link IllegalStateException}. * Otherwise, the Subscriber's {@code onSubscribe} method is * invoked with a new {@link Subscription}. Subscribers may * enable receiving items by invoking the {@code request} * method of this Subscription, and may unsubscribe by * invoking its {@code cancel} method. * * @param subscriber the subscriber * @throws NullPointerException if subscriber is null */ public void subscribe(Subscriber<? super T> subscriber); } /** * A receiver of messages. The methods in this interface are * invoked in strict sequential order for each {@link * Subscription}. * * @param <T> the subscribed item type */ public static interface Subscriber<T> { /** * Method invoked prior to invoking any other Subscriber * methods for the given Subscription. If this method throws * an exception, resulting behavior is not guaranteed, but may * cause the Subscription not to be established or to be cancelled. * * <p>Typically, implementations of this method invoke {@code * subscription.request} to enable receiving items. * * @param subscription a new subscription */ public void onSubscribe(Subscription subscription); /** * Method invoked with a Subscription's next item. If this * method throws an exception, resulting behavior is not * guaranteed, but may cause the Subscription to be cancelled. * * @param item the item */ public void onNext(T item); /** * Method invoked upon an unrecoverable error encountered by a * Publisher or Subscription, after which no other Subscriber * methods are invoked by the Subscription. If this method * itself throws an exception, resulting behavior is * undefined. * * @param throwable the exception */ public void onError(Throwable throwable); /** * Method invoked when it is known that no additional * Subscriber method invocations will occur for a Subscription * that is not already terminated by error, after which no * other Subscriber methods are invoked by the Subscription. * If this method throws an exception, resulting behavior is * undefined. */ public void onComplete(); } /** * Message control linking a {@link Publisher} and {@link * Subscriber}. Subscribers receive items only when requested, * and may cancel at any time. The methods in this interface are * intended to be invoked only by their Subscribers; usages in * other contexts have undefined effects. */ public static interface Subscription { /** * Adds the given number {@code n} of items to the current * unfulfilled demand for this subscription. If {@code n} is * less than or equal to zero, the Subscriber will receive an * {@code onError} signal with an {@link * IllegalArgumentException} argument. Otherwise, the * Subscriber will receive up to {@code n} additional {@code * onNext} invocations (or fewer if terminated). * * @param n the increment of demand; a value of {@code * Long.MAX_VALUE} may be considered as effectively unbounded */ public void request(long n); /** * Causes the Subscriber to (eventually) stop receiving * messages. Implementation is best-effort -- additional * messages may be received after invoking this method. * A cancelled subscription need not ever receive an * {@code onComplete} or {@code onError} signal. */ public void cancel(); } /** * A component that acts as both a Subscriber and Publisher. * * @param <T> the subscribed item type * @param <R> the published item type */ public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { } static final int DEFAULT_BUFFER_SIZE = 256; /** * Returns a default value for Publisher or Subscriber buffering, * that may be used in the absence of other constraints. * * @implNote * The current value returned is 256. * * @return the buffer size value */ public static int defaultBufferSize() { return DEFAULT_BUFFER_SIZE; } }
Flow這個類裡定義最基本的Publisher與Subscribe,該模式就是釋出訂閱模式。我們來看一下程式碼示例:
package com.hzgj.lyrk.demo; import java.util.concurrent.Flow; public class Main { public static void main(String[] args) { Flow.Publisher<String> publisher = subscriber -> { subscriber.onNext("1"); // 1 subscriber.onNext("2"); subscriber.onError(new RuntimeException("出錯")); // 2 // subscriber.onComplete(); }; publisher.subscribe(new Flow.Subscriber<>() { @Override public void onSubscribe(Flow.Subscription subscription) { subscription.cancel(); } @Override public void onNext(String item) { System.out.println(item); } @Override public void onError(Throwable throwable) { System.out.println("出錯了"); } @Override public void onComplete() { System.out.println("publish complete"); } }); } }
程式碼1 是一種資料流的體現,在Publisher中每次呼叫onNext的時候,在中都會在Subscribe的onNext方法進行消費
程式碼2 同樣是傳送錯誤訊號,等待訂閱者進行消費
執行結果:
1 2 出錯了
在上述程式碼中我們可以發現:Publisher在沒有被訂閱的時候,是不會觸發任何行為的。每次呼叫Publisher的onNext方法的時候都像是在發訊號,訂閱者收到訊號時執行相關內容,這就是典型的響應式程式設計的案例。不過java9提供的這個功能對非同步的支援不太好,也不夠強大。因此才會出現Reactor與RxJava等響應式框架
相關推薦
WebFlux基礎之響應式程式設計
上篇文章,我們簡單的瞭解了WebFlux的一些基礎與背景,並通過示例來寫了一個demo。我們知道WebFlux是響應式的web框架,其特點之一就是可以通過函數語言程式設計方式配置route。另外究竟什麼是響應式程式設計呢?這篇文章我們就簡單探討一下 一、Java8中的函數語言程式設計 百科中這樣定
springboot2.x簡單詳細教程--高階篇幅之響應式程式設計(第十五章)
一、SprinBoot2.x響應式程式設計簡介 簡介:講解什麼是reactive響應式程式設計和使用的好處 1、基礎理解:  
WebFlux響應式程式設計基礎之 4 reactive stream 響應式流
reactive stream 響應式流 — 簡而言之,就是多了一個溝通的渠道 釋出訂閱者 背壓 交流 Reactive Stream主要介面 java.util.concurrent.Flow 原始碼很重要 很有意思 多讀幾遍 im
WebFlux響應式程式設計基礎之 5 webflux服務端開發講解
Spring5 非組塞的開發模式 SpringMvc 與 SpringWebFlux 對比 學習工作機制 工作思想 更加重要 Netty 很重要 讀一下 Netty原始碼 先垂直擴充套件 –》 後水平擴充套件 5-2 非同步serv
Android響應式程式設計之RxJava2.0
前言 優點: 1、鏈式操作 2、非同步優化 實戰 先來個簡單的使用示例 Observable .create(new ObservableOnSubscribe<String>() {
Java響應式程式設計之CompletableFuture學習(一)
本文主要是介紹CompletableFuture的基本使用方法,在後面一篇文章中會寫一個簡單的demo。 1.建立一個完成的CompletableFuture 這種情況通常我們會在計算的開始階段使用它。 CompletableFuture<String>
[轉]springboot2 webflux 響應式程式設計學習路徑
原文連結 spring官方文件 springboot2 已經發布,其中最亮眼的非webflux響應式程式設計莫屬了!響應式的weblfux可以支援高吞吐量,意味著使用相同的資源可以處理更加多的請求,毫無疑問將會成為未來技術的趨勢,是必學的技術!很多人都看過相關的入門教程,但看完之後總覺得很迷糊,知其然不知道其
RxJava響應式程式設計之初級瞭解
據說現在流行的開發模式是 Retrofit+RxJava+MVP+ButterKnife 今天我就簡單來學習下RxJava的相關知識 以前我也只是聽說過RxJava,RxJava這個到底是什麼東西呢? 呵呵,它其實是一個庫,所以我們使用裡面的方法,得需
Android響應式程式設計(一)RxJava前篇[入門基礎]
1.RxJava概述 ReactiveX與RxJava 在講到RxJava之前我們首先要了解什麼是ReactiveX,因為RxJava是ReactiveX的一種java實現。 ReactiveX是Reactive Extensions的縮寫,一般簡寫為
SpringBoot2.x響應式程式設計webflux介紹
1、Spring WebFlux是Spring Framework 5.0中引入的新的反應式Web框架 與Spring MVC不同,它不需要Servlet API,完全非同步和非阻塞,並 通過Reactor專案實現Reactive Streams規範。 &nbs
SpringBoot2.x webflux響應式程式設計實戰
1、WebFlux中,請求和響應不再是WebMVC中的ServletRequest和ServletResponse,而是ServerRequest和ServerResponse 2、加入依賴,如果同時存在spring-boot-starter-web,則會優先用spring-boot-start
講課:Webflux響應式程式設計(SpringBoot 2.0新特性)
學習webflux前需要學習三個基礎: 函數語言程式設計和lambda表示式 Stream流程式設計 Reactive stream 響應式流 接下來進入學習 一、函數語言程式設計和lambda表示式 1. 什麼是函數語言程式設計 函數語言程式設計是
函數語言程式設計及響應式程式設計之己見
1. what is 函數語言程式設計? 函式,在程式設計中,通常體現為: 輸入 => 執行 => 結果。他不是命令式的,而是對一段操作進行邏輯封裝,拿到輸入,就能產出結果。通常來說,滿足函數語言程式設計的特性的“函式”應該有如下特點: 函式必須有入參,並且函
RxJS入門之函式響應式程式設計
一.函數語言程式設計 1.宣告式(Declarativ) 和宣告式相對應的程式設計⽅式叫做指令式程式設計(ImperativeProgramming),指令式程式設計也是最常見的⼀種程式設計⽅式。 //指令式程式設計: function double(arr) { const results = []
Spring Boot (十四): 響應式程式設計以及 Spring Boot Webflux 快速入門
1. 什麼是響應式程式設計 在計算機中,響應式程式設計或反應式程式設計(英語:Reactive programming)是一種面向資料流和變化傳播的程式設計正規化。這意味著可以在程式語言中很方便地表達靜態或動態的資料流,而相關的計算模型會自動將變化的值通過資料流進行傳播。 例如,在指令式程式設計環境中,a
響應式程式設計簡介之:Reactor
[toc] # 簡介 Reactor是reactivex家族的一個非常重要的成員,Reactor是第四代的reactive library,它是基於Reactive Streams標準基礎上開發的,主要用來構建JVM環境下的非阻塞應用程式。 今天給大家介紹一下Reactor。 # Reactor簡介
移動web之響應式布局
data plan 不同 解決 css sea styles struct ebp 1.響應式布局的概念 響應式布局是Ethan Marcotte在2010年5月份提出的一個概念。簡而言之。就是一個站點可以兼容多個終端——而不是為每一個終端做一個特定的版本
(5)Spring WebFlux快速上手——響應式Spring的道法術器
響應式編程 Spring WebFlux 本系列文章索引《響應式Spring的道法術器》前情提要 lambda與函數式 | Reactor 3快速上手本文源碼 1.3.3 Spring WebFlux Spring WebFlux是隨Spring 5推出的響應式Web框架。 1)服務端技術棧 Sp
web前端之響應式佈局,你必須要知道的
一、前言 響應式Web設計可以讓一個網站同時適配多種裝置和多個螢幕,可以讓網站的佈局和功能隨使用者的使用環境(螢幕大小、輸入方式、裝置/瀏覽器能力)而變化。本文主要介紹一些響應式佈局容易忽略但又很重要的知識點。 二、視口 移動前端中常說的 viewport (視口)就是瀏覽器中用於呈現網
vue源碼之響應式數據
完成 uri handle 不能 構造器 sre 疑問 ase accept 分析vue是如何實現數據響應的. 前記 現在回顧一下看數據響應的原因. 之前看了vuex和vue-i18n的源碼, 他們都有自己內部的vm, 也就是vue實例. 使用的都是vue的響應式數據特性及