1. 程式人生 > >WebFlux基礎之響應式程式設計

WebFlux基礎之響應式程式設計

  上篇文章,我們簡單的瞭解了WebFlux的一些基礎與背景,並通過示例來寫了一個demo。我們知道WebFlux是響應式的web框架,其特點之一就是可以通過函數語言程式設計方式配置route。另外究竟什麼是響應式程式設計呢?這篇文章我們就簡單探討一下

一、Java8中的函數語言程式設計

  百科中這樣定義函數語言程式設計:

  函數語言程式設計是種程式設計方式,它將電腦運算視為函式的計算。函式程式語言最重要的基礎是λ演算(lambda calculus),而且λ演算的函式可以接受函式當作輸入(引數)和輸出(返回值)。那麼在Java8裡怎麼樣來實現它呢?

示例一

在這裡我先自己寫一個例子

定義介面:

package com.bdqn.lyrk.basic.java;

/**
 * 函式式介面
 *
 * @author chen.nie
 * @date 2018/7/18
 **/
@FunctionalInterface
public interface OperateNumberFunctions {

    void operate(Integer number);

    default void print() {
        
    }
}

  在定義的介面上新增@FunctionalInterface表明其是函式式介面,這個註解用於檢測函式式介面規範,定義函式式介面時該介面內必須有且只有一個抽象的方法。

定義類:

package com.bdqn.lyrk.basic.java;

import java.util.Optional;
import java.util.function.Predicate;

/**
 * 定義函數語言程式設計類
 */
public class NumberFunctions {

    private Integer number;

    private NumberFunctions() {
    }

    private static NumberFunctions numberFunctions = new
NumberFunctions(); public static NumberFunctions of(Integer number) { numberFunctions.number = number; return numberFunctions; } public NumberFunctions add(Integer number) { numberFunctions.number += number; return numberFunctions; } public NumberFunctions subtraction(Integer number) { numberFunctions.number -= number; return numberFunctions; } public Optional<NumberFunctions> filter(Predicate<Integer> predicate) { if (predicate.test(this.number)) return Optional.of(numberFunctions); return Optional.ofNullable(new NumberFunctions()); } public void operate(OperateNumberFunctions functions) { functions.operate(this.number); } }

  在這裡定義類進行簡單的運算與過濾條件。那麼在Main方法裡可以這麼寫:

package com.bdqn.lyrk.basic.java;

public class Main {

    public static void main(String[] args) {
        NumberFunctions.of(10).add(30).subtraction(2).filter(number -> number>20).get().operate(System.out::println);
    }
}

  那麼輸出結果為38

示例二

  在Java8裡有一個類叫Stream。Stream是資料流的意思,這個類略微有點像Reactor中Flux,它提供了類似於操作符的功能,我們來看一個例子:

Main方法

package com.bdqn.lyrk.basic.java;

import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

public class Main {

    public static void main(String[] args) {
        /*
            在這裡先將Stream裡的內容做乘2的操作
            然後在進行倒序排序
            緊接著過濾出是4的倍數的數字
            然後轉換成集合在列印
         */
        Stream.of(15, 26, 34, 455, 5, 6).map(number -> number * 2).sorted((num1, num2) -> num2 - num1).filter(integer -> integer % 4 == 0).collect(toList()).forEach(System.out::println);
    }
}

  執行得到的結果:

68
52
12

關於::操作符

  該操作符是lambda表示式的更特殊寫法,使用此操作符可以簡化函式式介面的實現,這個方法至少滿足以下特定條件:

  1)方法返回值與函式式介面相同

  2)方法引數與函式式介面相同

  舉例說明

package java.util.function;

/**
 * Represents a supplier of results.
 *
 * <p>There is no requirement that a new or distinct result be returned each
 * time the supplier is invoked.
 *
 * <p>This is a <a href="package-summary.html">functional interface</a>
 * whose functional method is {@link #get()}.
 *
 * @param <T> the type of results supplied by this supplier
 *
 * @since 1.8
 */
@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

  java中Runnable介面:

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

  java中的Predicate介面:

package java.util.function;

import java.util.Objects;

/**
 * Represents a predicate (boolean-valued function) of one argument.
 *
 * <p>This is a <a href="package-summary.html">functional interface</a>
 * whose functional method is {@link #test(Object)}.
 *
 * @param <T> the type of the input to the predicate
 *
 * @since 1.8
 */
@FunctionalInterface
public interface Predicate<T> {

    /**
     * Evaluates this predicate on the given argument.
     *
     * @param t the input argument
     * @return {@code true} if the input argument matches the predicate,
     * otherwise {@code false}
     */
    boolean test(T t);

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * AND of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code false}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ANDed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * AND of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default Predicate<T> and(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) && other.test(t);
    }

    /**
     * Returns a predicate that represents the logical negation of this
     * predicate.
     *
     * @return a predicate that represents the logical negation of this
     * predicate
     */
    default Predicate<T> negate() {
        return (t) -> !test(t);
    }

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * OR of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code true}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ORed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * OR of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default Predicate<T> or(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) || other.test(t);
    }

    /**
     * Returns a predicate that tests if two arguments are equal according
     * to {@link Objects#equals(Object, Object)}.
     *
     * @param <T> the type of arguments to the predicate
     * @param targetRef the object reference with which to compare for equality,
     *               which may be {@code null}
     * @return a predicate that tests if two arguments are equal according
     * to {@link Objects#equals(Object, Object)}
     */
    static <T> Predicate<T> isEqual(Object targetRef) {
        return (null == targetRef)
                ? Objects::isNull
                : object -> targetRef.equals(object);
    }
}

那麼上述的介面分別可以使用如下寫法,注意實現該介面的方法特點

package com.bdqn.lyrk.basic.java;

import java.util.function.Predicate;
import java.util.function.Supplier;

public class Main {
    private static int i;

    public static void main(String[] args) {

        /*
            建立物件的方式
         */
        Supplier<Object> supplier = Object::new;

        /*
            呼叫方法的方式(無引數)
         */
        Runnable runnable = Main::add;

        /*
            呼叫方法的方式(有引數)
         */
        Predicate<String> predicate = Main::filter;
    }

    public static void add() {
        i++;
        System.out.println("test" + i);
    }

    public static boolean filter(String test) {
        return test != null;
    }
}

我們可以看到使用函數語言程式設計藉助於lambda表示式,使得程式碼更簡潔清爽 

二、Java中的響應式程式設計

  關於響應式程式設計,百度百科是這麼定義的:

  簡稱RP(Reactive Programming)

  響應式程式設計是一種面向資料流和變化傳播的程式設計正規化。這意味著可以在程式語言中很方便地表達靜態或動態的資料流,而相關的計算模型會自動將變化的值通過資料流進行傳播。   在這裡有兩個關鍵詞:資料流與變化傳播。下面我們來通過程式碼來演示下響應式程式設計是怎麼回事

 Java8及以前版本

  最典型的示例就是,JDK提供的觀察者模式類Observer與Observalbe:

package com.hzgj.lyrk.demo;

import java.util.Observable;

public class ObserverDemo extends Observable {

    public static void main(String[] args) {
        ObserverDemo observable = new ObserverDemo();
        observable.addObserver((o, arg) -> {
            System.out.println("發生變化");
        });
        observable.addObserver((o, arg) -> {
            System.out.println("收到被觀察者通知,準備改變");
        });
        observable.setChanged();
        observable.notifyObservers();
    }
}

  在上述程式碼示例中觀察者並沒有及時執行,而是在接受到被觀察者傳送訊號的時候才有了“響應”。其中setChanged()與notifyObservers方法就對應響應式程式設計中定義的關鍵詞--變化與傳播。還有一個典型的示例就是Swing中的事件機制,有興趣的朋友可以下去查閱相關資料,在這裡就不再進行闡述。

 Java9及其後版本

  從java9開始,Observer與Observable已經被標記為過時的類了,取而代之的是Flow類。Flow才是真正意義上的響應式程式設計類,因為觀察者Observer與Observable雖然能夠響應,但是在資料流的體現並不是特別突出。Flow這個類,我們可以先看一下:

public final class Flow {

    private Flow() {} // uninstantiable

    /**
     * A producer of items (and related control messages) received by
     * Subscribers.  Each current {@link Subscriber} receives the same
     * items (via method {@code onNext}) in the same order, unless
     * drops or errors are encountered. If a Publisher encounters an
     * error that does not allow items to be issued to a Subscriber,
     * that Subscriber receives {@code onError}, and then receives no
     * further messages.  Otherwise, when it is known that no further
     * messages will be issued to it, a subscriber receives {@code
     * onComplete}.  Publishers ensure that Subscriber method
     * invocations for each subscription are strictly ordered in <a
     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
     * order.
     *
     * <p>Publishers may vary in policy about whether drops (failures
     * to issue an item because of resource limitations) are treated
     * as unrecoverable errors.  Publishers may also vary about
     * whether Subscribers receive items that were produced or
     * available before they subscribed.
     *
     * @param <T> the published item type
     */
    @FunctionalInterface
    public static interface Publisher<T> {
        /**
         * Adds the given Subscriber if possible.  If already
         * subscribed, or the attempt to subscribe fails due to policy
         * violations or errors, the Subscriber's {@code onError}
         * method is invoked with an {@link IllegalStateException}.
         * Otherwise, the Subscriber's {@code onSubscribe} method is
         * invoked with a new {@link Subscription}.  Subscribers may
         * enable receiving items by invoking the {@code request}
         * method of this Subscription, and may unsubscribe by
         * invoking its {@code cancel} method.
         *
         * @param subscriber the subscriber
         * @throws NullPointerException if subscriber is null
         */
        public void subscribe(Subscriber<? super T> subscriber);
    }

    /**
     * A receiver of messages.  The methods in this interface are
     * invoked in strict sequential order for each {@link
     * Subscription}.
     *
     * @param <T> the subscribed item type
     */
    public static interface Subscriber<T> {
        /**
         * Method invoked prior to invoking any other Subscriber
         * methods for the given Subscription. If this method throws
         * an exception, resulting behavior is not guaranteed, but may
         * cause the Subscription not to be established or to be cancelled.
         *
         * <p>Typically, implementations of this method invoke {@code
         * subscription.request} to enable receiving items.
         *
         * @param subscription a new subscription
         */
        public void onSubscribe(Subscription subscription);

        /**
         * Method invoked with a Subscription's next item.  If this
         * method throws an exception, resulting behavior is not
         * guaranteed, but may cause the Subscription to be cancelled.
         *
         * @param item the item
         */
        public void onNext(T item);

        /**
         * Method invoked upon an unrecoverable error encountered by a
         * Publisher or Subscription, after which no other Subscriber
         * methods are invoked by the Subscription.  If this method
         * itself throws an exception, resulting behavior is
         * undefined.
         *
         * @param throwable the exception
         */
        public void onError(Throwable throwable);

        /**
         * Method invoked when it is known that no additional
         * Subscriber method invocations will occur for a Subscription
         * that is not already terminated by error, after which no
         * other Subscriber methods are invoked by the Subscription.
         * If this method throws an exception, resulting behavior is
         * undefined.
         */
        public void onComplete();
    }

    /**
     * Message control linking a {@link Publisher} and {@link
     * Subscriber}.  Subscribers receive items only when requested,
     * and may cancel at any time. The methods in this interface are
     * intended to be invoked only by their Subscribers; usages in
     * other contexts have undefined effects.
     */
    public static interface Subscription {
        /**
         * Adds the given number {@code n} of items to the current
         * unfulfilled demand for this subscription.  If {@code n} is
         * less than or equal to zero, the Subscriber will receive an
         * {@code onError} signal with an {@link
         * IllegalArgumentException} argument.  Otherwise, the
         * Subscriber will receive up to {@code n} additional {@code
         * onNext} invocations (or fewer if terminated).
         *
         * @param n the increment of demand; a value of {@code
         * Long.MAX_VALUE} may be considered as effectively unbounded
         */
        public void request(long n);

        /**
         * Causes the Subscriber to (eventually) stop receiving
         * messages.  Implementation is best-effort -- additional
         * messages may be received after invoking this method.
         * A cancelled subscription need not ever receive an
         * {@code onComplete} or {@code onError} signal.
         */
        public void cancel();
    }

    /**
     * A component that acts as both a Subscriber and Publisher.
     *
     * @param <T> the subscribed item type
     * @param <R> the published item type
     */
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

    static final int DEFAULT_BUFFER_SIZE = 256;

    /**
     * Returns a default value for Publisher or Subscriber buffering,
     * that may be used in the absence of other constraints.
     *
     * @implNote
     * The current value returned is 256.
     *
     * @return the buffer size value
     */
    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }

}

  Flow這個類裡定義最基本的Publisher與Subscribe,該模式就是釋出訂閱模式。我們來看一下程式碼示例:

package com.hzgj.lyrk.demo;

import java.util.concurrent.Flow;

public class Main {

    public static void main(String[] args) {
        Flow.Publisher<String> publisher = subscriber -> {
            subscriber.onNext("1"); // 1
            subscriber.onNext("2");
            subscriber.onError(new RuntimeException("出錯")); // 2
            //  subscriber.onComplete();
        };
        publisher.subscribe(new Flow.Subscriber<>() {
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                subscription.cancel();
            }

            @Override
            public void onNext(String item) {
                System.out.println(item);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("出錯了");
            }

            @Override
            public void onComplete() {
                System.out.println("publish complete");
            }
        });
    }
}

   程式碼1 是一種資料流的體現,在Publisher中每次呼叫onNext的時候,在中都會在Subscribe的onNext方法進行消費

   程式碼2 同樣是傳送錯誤訊號,等待訂閱者進行消費

   執行結果:

1
2
出錯了

  在上述程式碼中我們可以發現:Publisher在沒有被訂閱的時候,是不會觸發任何行為的。每次呼叫Publisher的onNext方法的時候都像是在發訊號,訂閱者收到訊號時執行相關內容,這就是典型的響應式程式設計的案例。不過java9提供的這個功能對非同步的支援不太好,也不夠強大。因此才會出現Reactor與RxJava等響應式框架

相關推薦

WebFlux基礎響應程式設計

  上篇文章,我們簡單的瞭解了WebFlux的一些基礎與背景,並通過示例來寫了一個demo。我們知道WebFlux是響應式的web框架,其特點之一就是可以通過函數語言程式設計方式配置route。另外究竟什麼是響應式程式設計呢?這篇文章我們就簡單探討一下 一、Java8中的函數語言程式設計   百科中這樣定

springboot2.x簡單詳細教程--高階篇幅響應程式設計(第十五章)

      一、SprinBoot2.x響應式程式設計簡介     簡介:講解什麼是reactive響應式程式設計和使用的好處     1、基礎理解:     

WebFlux響應程式設計基礎 4 reactive stream 響應

reactive stream 響應式流 — 簡而言之,就是多了一個溝通的渠道 釋出訂閱者 背壓 交流 Reactive Stream主要介面 java.util.concurrent.Flow 原始碼很重要 很有意思 多讀幾遍 im

WebFlux響應程式設計基礎 5 webflux服務端開發講解

Spring5 非組塞的開發模式 SpringMvc 與 SpringWebFlux 對比 學習工作機制 工作思想 更加重要 Netty 很重要 讀一下 Netty原始碼 先垂直擴充套件 –》 後水平擴充套件 5-2 非同步serv

Android響應程式設計RxJava2.0

前言 優點: 1、鏈式操作 2、非同步優化 實戰 先來個簡單的使用示例 Observable .create(new ObservableOnSubscribe<String>() {

Java響應程式設計CompletableFuture學習(一)

本文主要是介紹CompletableFuture的基本使用方法,在後面一篇文章中會寫一個簡單的demo。 1.建立一個完成的CompletableFuture   這種情況通常我們會在計算的開始階段使用它。 CompletableFuture<String>

[轉]springboot2 webflux 響應程式設計學習路徑

原文連結 spring官方文件 springboot2 已經發布,其中最亮眼的非webflux響應式程式設計莫屬了!響應式的weblfux可以支援高吞吐量,意味著使用相同的資源可以處理更加多的請求,毫無疑問將會成為未來技術的趨勢,是必學的技術!很多人都看過相關的入門教程,但看完之後總覺得很迷糊,知其然不知道其

RxJava響應程式設計初級瞭解

據說現在流行的開發模式是 Retrofit+RxJava+MVP+ButterKnife 今天我就簡單來學習下RxJava的相關知識 以前我也只是聽說過RxJava,RxJava這個到底是什麼東西呢? 呵呵,它其實是一個庫,所以我們使用裡面的方法,得需

Android響應程式設計(一)RxJava前篇[入門基礎]

1.RxJava概述 ReactiveX與RxJava 在講到RxJava之前我們首先要了解什麼是ReactiveX,因為RxJava是ReactiveX的一種java實現。 ReactiveX是Reactive Extensions的縮寫,一般簡寫為

SpringBoot2.x響應程式設計webflux介紹

1、Spring WebFlux是Spring Framework 5.0中引入的新的反應式Web框架     與Spring MVC不同,它不需要Servlet API,完全非同步和非阻塞,並 通過Reactor專案實現Reactive Streams規範。 &nbs

SpringBoot2.x webflux響應程式設計實戰

1、WebFlux中,請求和響應不再是WebMVC中的ServletRequest和ServletResponse,而是ServerRequest和ServerResponse 2、加入依賴,如果同時存在spring-boot-starter-web,則會優先用spring-boot-start

講課:Webflux響應程式設計(SpringBoot 2.0新特性)

學習webflux前需要學習三個基礎: 函數語言程式設計和lambda表示式 Stream流程式設計 Reactive stream 響應式流 接下來進入學習 一、函數語言程式設計和lambda表示式 1. 什麼是函數語言程式設計 函數語言程式設計是

函數語言程式設計響應程式設計己見

1. what is 函數語言程式設計? 函式,在程式設計中,通常體現為: 輸入 => 執行 => 結果。他不是命令式的,而是對一段操作進行邏輯封裝,拿到輸入,就能產出結果。通常來說,滿足函數語言程式設計的特性的“函式”應該有如下特點: 函式必須有入參,並且函

RxJS入門函式響應程式設計

一.函數語言程式設計 1.宣告式(Declarativ) 和宣告式相對應的程式設計⽅式叫做指令式程式設計(ImperativeProgramming),指令式程式設計也是最常見的⼀種程式設計⽅式。 //指令式程式設計: function double(arr) { const results = []

Spring Boot (十四): 響應程式設計以及 Spring Boot Webflux 快速入門

1. 什麼是響應式程式設計 在計算機中,響應式程式設計或反應式程式設計(英語:Reactive programming)是一種面向資料流和變化傳播的程式設計正規化。這意味著可以在程式語言中很方便地表達靜態或動態的資料流,而相關的計算模型會自動將變化的值通過資料流進行傳播。 例如,在指令式程式設計環境中,a

響應程式設計簡介:Reactor

[toc] # 簡介 Reactor是reactivex家族的一個非常重要的成員,Reactor是第四代的reactive library,它是基於Reactive Streams標準基礎上開發的,主要用來構建JVM環境下的非阻塞應用程式。 今天給大家介紹一下Reactor。 # Reactor簡介

移動web響應布局

data plan 不同 解決 css sea styles struct ebp 1.響應式布局的概念 響應式布局是Ethan Marcotte在2010年5月份提出的一個概念。簡而言之。就是一個站點可以兼容多個終端——而不是為每一個終端做一個特定的版本

(5)Spring WebFlux快速上手——響應Spring的道法術器

響應式編程 Spring WebFlux 本系列文章索引《響應式Spring的道法術器》前情提要 lambda與函數式 | Reactor 3快速上手本文源碼 1.3.3 Spring WebFlux Spring WebFlux是隨Spring 5推出的響應式Web框架。 1)服務端技術棧 Sp

web前端響應佈局,你必須要知道的

一、前言 響應式Web設計可以讓一個網站同時適配多種裝置和多個螢幕,可以讓網站的佈局和功能隨使用者的使用環境(螢幕大小、輸入方式、裝置/瀏覽器能力)而變化。本文主要介紹一些響應式佈局容易忽略但又很重要的知識點。 二、視口 移動前端中常說的 viewport (視口)就是瀏覽器中用於呈現網

vue源碼響應數據

完成 uri handle 不能 構造器 sre 疑問 ase accept 分析vue是如何實現數據響應的. 前記 現在回顧一下看數據響應的原因. 之前看了vuex和vue-i18n的源碼, 他們都有自己內部的vm, 也就是vue實例. 使用的都是vue的響應式數據特性及