1. 程式人生 > >RxJava學習 - 13. Transformers and Custom Operators

RxJava學習 - 13. Transformers and Custom Operators

RxJava學習 - 13. Transformers and Custom Operators

可以使用compose()和lift()實現自己的operators。Observable和Flowable都有這兩個方法。

Transformers

有時候,可能想重新使用Observable或者Flowable鏈的某個片段,可以使用某種方法,把這些operators組合成新的operators。
ObservableTransformer和FlowableTransformer提供了重新使用程式碼的能力。

ObservableTransformer

以前有個例子,使用collect()把Observable轉成Single<ImmutableList>。看下面的例子:

import com.google.common.collect.ImmutableList;
import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .collect(ImmutableList::builder, ImmutableList.Builder::add)
                .
map(ImmutableList.Builder::build) .subscribe(System.out::println); Observable.range(1, 15) .collect(ImmutableList::builder, ImmutableList.Builder::add) .map(ImmutableList.Builder::build) .subscribe(System.out::println); } }

下面的程式碼出現了兩次:

collect(ImmutableList::builder, ImmutableList.Builder::add)
        .map(ImmutableList.Builder::build)

我們可以想辦法把他們組合成新的operator。對於目標Observable,你可以實現ObservableTransformer<T, R>。這個類有一個apply()方法,接受一個Observable,返回一個Observable。在你的實現裡,你可以返回一個Observable鏈,把任何operators加給上游,然後返回Observable。
對我們的例子,要把Observable轉換成Observable<ImmutableList>。我們包裝一個ObservableTransformer<T, ImmutableList>:

    public static <T> ObservableTransformer<T, ImmutableList<T>> toImmutableList() {
        return new ObservableTransformer<T, ImmutableList<T>>() {
            @Override
            public ObservableSource<ImmutableList<T>> apply(Observable<T> upstream) {
                return upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
                        .map(ImmutableList.Builder::build)
                        .toObservable(); // must turn Single into Observable
            }
        };
    }

因為collect()返回一個Single,我們呼叫toObservable()(因為ObservableTransformer期待一個Observable,而不是Single)。
ObservableTransformer只有一個抽象方法,所以可以使用lambda:

    public static <T> ObservableTransformer<T, ImmutableList<T>> toImmutableList() {
        return upstream -> upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
                        .map(ImmutableList.Builder::build)
                        .toObservable(); // must turn Single into Observable
    }

想在一個Observable鏈內呼叫Transformer,可以使用compose()方法。它接受一個ObservableTransformer<T, R>,返回轉換後的Observable:

import com.google.common.collect.ImmutableList;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;

public class Launcher {
    public static void main(String[] args) {
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(toImmutableList())
                .subscribe(System.out::println);
        Observable.range(1, 10)
                .compose(toImmutableList())
                .subscribe(System.out::println);     
    }
    
    public static <T> ObservableTransformer<T, ImmutableList<T>> toImmutableList() {
        return upstream -> upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
                        .map(ImmutableList.Builder::build)
                        .toObservable(); // must turn Single into Observable
    }    
}

你也可以為特定的emission型別和接受的引數增加Transformers。例如,增加一個joinToString(),它接受一個分隔符,用來級聯字串:

import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;

public class Launcher {
    public static void main(String[] args) {
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(joinToString("/"))
                .subscribe(System.out::println);      
    }
    
    public static ObservableTransformer<String, String> joinToString(String separator) {
        return upstream -> upstream
                .collect(StringBuilder::new, (b,s) -> {
                    if (b.length() == 0)
                        b.append(s);
                    else
                        b.append(separator).append(s);
                })
                .map(StringBuilder::toString)
                .toObservable();
    }    
}

FlowableTransformer

當你實現ObservableTransformer的時候,也許覺得增加FlowableTransformer會更好。這樣,你的operator可用於Observables,也可用於Flowables。
FlowableTransformer和ObservableTransformer差別不大。當然,和Flowables組合時,它支援背壓:

import com.google.common.collect.ImmutableList;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;

public class Launcher {
    public static void main(String[] args) {
        Flowable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(toImmutableList())
                .subscribe(System.out::println);
        Flowable.range(1, 10)
                .compose(toImmutableList())
                .subscribe(System.out::println);     
    }
    
    public static <T> FlowableTransformer<T, ImmutableList<T>> toImmutableList() {
        return upstream -> upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
                        .map(ImmutableList.Builder::build)
                        .toFlowable(); // must turn Single into Observable
    }    
}

Avoiding shared state with Transformers

比如,你想增加一個ObservableTransformer<T, IndexedValue>,它把每個emission和一個從0開始的連續索引配對。首先,你增加一個IndexedValue類:

    static final class IndexedValue<T> {
        final int index;
        final T value;
        IndexedValue(int index, T value) {
            this.index = index;
            this.value = value;
        }
        @Override
        public String toString() {
            return index + " - " + value;
        }
    }

然後,你增加一個ObservableTransformer<T, IndexedValue>,使用一個AtomicInteger做索引:

    static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
        final AtomicInteger indexer = new AtomicInteger(-1);
        return upstream -> upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet(), v));
    }

看出問題了嗎?讓我們執行下面的程式。仔細看輸出:

import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import java.util.concurrent.atomic.AtomicInteger;

public class Launcher {
    public static void main(String[] args) {
        Observable<IndexedValue<String>> indexedStrings =
                Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .compose(withIndex());
        indexedStrings.subscribe(v -> System.out.println("Subscriber 1: " + v));
        indexedStrings.subscribe(v -> System.out.println("Subscriber 2: " + v));        
    }
    
    static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
        final AtomicInteger indexer = new AtomicInteger(-1);
        return upstream -> upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet(), v));
    }
    
    static final class IndexedValue<T> {
        final int index;
        final T value;
        IndexedValue(int index, T value) {
            this.index = index;
            this.value = value;
        }
        @Override
        public String toString() {
            return index + " - " + value;
        }
    }    
}

輸出是

Subscriber 1: 0 - Alpha
Subscriber 1: 1 - Beta
Subscriber 1: 2 - Gamma
Subscriber 1: 3 - Delta
Subscriber 1: 4 - Epsilon
Subscriber 2: 5 - Alpha
Subscriber 2: 6 - Beta
Subscriber 2: 7 - Gamma
Subscriber 2: 8 - Delta
Subscriber 2: 9 - Epsilon

注意,AtomicInteger的一個例項被兩個訂閱共享了。
可以為每個訂閱增加一個資源(比如AtomicInteger),包裝在Observable.defer()內:

    static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
        return upstream -> Observable.defer(() -> {
            AtomicInteger indexer = new AtomicInteger(-1);
            return upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet(), v));
        });
    }

對於這個例子,也可以使用Observable.zip()或者zipWith(),解決我們的問題:

    static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
        return upstream ->
            Observable.zip(upstream,
                Observable.range(0,Integer.MAX_VALUE),
                (v,i) -> new IndexedValue<T>(i, v)
        );
    }

Operators

理想的情況下,你很少需要從頭開始通過實現ObservableOperator和FlowableOperator構造自己的operator。ObservableTransformer和FlowableTransformer可以滿足大多數需求。
可有時候,你發現不得不做些現有的operators不能做或者不容易做的事情。排除了任何選項,你可能不得不增加一個operator,在上游和下游之間操縱每個onNext()、onComplete()和onError()事件。
在你增加自己的operator之前,要先試試compose()。如果失敗了,推薦你在StackOverflow提問題。
實在不行,再構造自己的operator。

Implementing an ObservableOperator

實現自己的ObservableOperator(或者FlowableTransformer)要做更多的工作。不組合現有的operators,你需要攔截onNext()、onComplete()、onError()和onSubscribe(),實現自己的Observer。該Observer把onNext()、onComplete()和onError()事件傳給下游Observer。
比如,你要增加doOnEmpty() operator,當onComplete()被呼叫的時候,它會執行一個Action。要增加自己的ObservableOperator<Downstream, Upstream>,需要實現它的apply()方法。它接受一個Observer,返回一個Observer。然後,你可以通過呼叫lift(),使用這個ObservableOperator:

import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.functions.Action;
import io.reactivex.observers.DisposableObserver;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 5)
                .lift(doOnEmpty(() -> System.out.println("Operation 1 Empty!")))
                .subscribe(v -> System.out.println("Operation 1: " + v));
        Observable.<Integer>empty()
                .lift(doOnEmpty(() -> System.out.println("Operation 2 Empty!")))
                .subscribe(v -> System.out.println("Operation 2: " + v));        
    }
    
    public static <T> ObservableOperator<T, T> doOnEmpty(Action action) {
        return new ObservableOperator<T, T>() {
            @Override
            public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
                return new DisposableObserver<T>() {
                    boolean isEmpty = true;

                    @Override
                    public void onNext(T value) {
                        isEmpty = false;
                        observer.onNext(value);
                    }

                    @Override
                    public void onError(Throwable t) {
                        observer.onError(t);
                    }

                    @Override
                    public void onComplete() {
                        if (isEmpty) {
                            try {
                                action.run();
                            } catch (Exception e) {
                                onError(e);
                                return;
                            }
                        }
                        observer.onComplete();
                    }
                };
            }
        };
    }    
}

就像Transformers一樣,當增加自己的operators的時候,不要在訂閱之間共享狀態。
還有,onNext()、onComplete()和onError()呼叫是可以按需要操縱和混合的。比如,toList()不會把收到的每個onNext()都傳給下游。它會在內部列表中收集這些emissions。當上遊呼叫了onComplete(),它就呼叫下游的onNext(),把列表傳給下游,然後呼叫onComplete()。現在,我們實現自己的myToList(),來理解toList()如何工作:

import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.observers.DisposableObserver;
import java.util.ArrayList;
import java.util.List;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 5)
                .lift(myToList())
                .subscribe(v -> System.out.println("Operation 1: " + v));
        Observable.<Integer>empty()
                .lift(myToList())
                .subscribe(v -> System.out.println("Operation 2: " + v));        
    }
    
    public static <T> ObservableOperator<List<T>,T> myToList() {
        return observer -> new DisposableObserver<T>() {
            ArrayList
            
           

相關推薦

RxJava學習 - 13. Transformers and Custom Operators

RxJava學習 - 13. Transformers and Custom Operators Transformers ObservableTransformer FlowableTransformer Avoiding shared

RxJava學習 - 12. Flowables and Backpressure

RxJava學習 - 12. Flowables and Backpressure Understanding backpressure An example that needs backpressure Introducing the Flo

RxJava學習 - 10. Concurrency and Parallelization

RxJava學習 - 10. Concurrency and Parallelization Introducing RxJava concurrency Keeping an application alive Understanding

python 3.x 學習筆記13 (socket_ssh and socket_文件傳輸)

粘包問題 問題 取出 nec imp 傳輸文件 ket color md5 ssh服務端 import socket,os server = socket.socket() server.bind((‘localhost‘,6666)) server.listen()

RxJava學習 - 5. Single, Completable, and Maybe

RxJava學習 - 5. Single, Completable, and Maybe Single Maybe Completable Single Single實際上只發射一次。它有自己的SingleObserver介面: i

RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering

RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering Buffering Fixed-size buffering Time-based buffering Boun

RxJava學習 - 9. Multicasting, Replaying, and Caching

RxJava學習 - 9. Multicasting, Replaying, and Caching Understanding multicasting Multicasting with operators When to multicast

mysql-學習-13-20170619-MySQL備份恢復-xtrabackup-2

soc tar pex cfa nod 遠程 表空間 tid doc mysql-學習-13-20170619-MySQL備份恢復-xtrabackup-2 【管理員】吳炳錫(82565387) 20:34:15基於xtrabackup的增備,只需要了解如果需要增備建

手勢跟蹤論文學習:Realtime and Robust Hand Tracking from Depth(三)Cost Function

引入 tail track col div 理想 問題 from details iker原創。轉載請標明出處:http://blog.csdn.net/ikerpeng/article/details/39050619 Realtime and Robust Hand

struts2學習(13)struts2文件上傳和下載(1)

action alt for ide 上傳文件 fig .org dac str 一、Struts2文件上傳: 二、配置文件的大小以及允許上傳的文件類型: 三、大文件上傳: 如果不配置上傳文件的大小,struts2默認允許上傳文件最大為2M; 2097152Byte;

最權威的RXJaVa學習資料

學習 android music roi androi andro java學習 oid com aNDROID%E8%AF%BB%E5%86%99%E6%96%87%E4%BB%B6%E7%9A%84N%E7%A7%8D%E5%86%99%E6%B3%95 http:/

OC學習13——Foundation框架中的集合

str 集合類 結構 pan sar set 體系 隊列 數組   OC集合類是一些非常有用的工具類,它可以用於存儲多個數量不等的對象,並可以實現常用的數據結構(棧、隊列等),此外,OC集合還可用於保存具有映射關系的關聯數組。OC的集合大致可以分為:NSArray、NSSe

php之快速入門學習-13(PHP 循環 - While 循環)

style 快速入門 數組 執行 span tro 運行 設置 快速 PHP 循環 - While 循環 循環執行代碼塊指定的次數,或者當指定的條件為真時循環執行代碼塊。 PHP 循環 在您編寫代碼時,您經常需要讓相同的代碼塊一次又一次地重復運行。我們可以在代

C語言學習13

排序 %d uic pri quick class 學習 span bsp 快速排序 1 //快速排序 2 #include <stdio.h> 3 4 void quicksort(int a[], int left, int right);

【c學習-13

pow(x 字符數 print 判斷 ssa python 常量 ++ 叠代 /*庫函數 1:數學函數庫:math.h abs():絕對值; acos(),asin(),atan():cos,sin,tan的倒數 exp():指數的次

matplotlib的學習13-subplot分格顯示

layout nbsp light pyplot 設置 out imp 標題 true import matplotlib.pyplot as plt plt.figure()#創建一個圖像窗口 # 使用plt.subplot2grid來創建第1個小圖, (3,3)表

python學習(13)

遞歸實現 切片 非遞歸 max 小數 嵌套 bcd urn file random.uniform(a,b)隨機生成a,b之間的一個浮點數 >> random.uniform(1,20)1.0130916166719703 習題1:生成[“z1”,”y2”,

[論文學習]Private traits and attributes are predictable from digital records of human behavior

cited: Kosinski M, Stillwell D, Graepel T. Private traits and attributes are predictable from digital records of human behavior[J]. Proceedi

RxJava學習 - 6. Disposing

RxJava學習 - 6. Disposing Handling a Disposable within an Observer Using CompositeDisposable Handling Disposal with Observable.crea

RxJava學習 - 4. Other Observable sources

RxJava學習 - 4. Other Observable sources Observable.range() Observable.interval() Observable.future() Observable.empty() Obse