1. 程式人生 > >RxJava和RxAndroid用法詳解

RxJava和RxAndroid用法詳解

RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)。

最簡單的使用:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("第一條");
                emitter.onNext("第二條");
                emitter.onComplete();
//                emitter.onError(new Exception("ssfas"));
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG,"--------onSubscribe------");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,"--------onNext------:"+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"--------onError------");
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"--------onComplete------");
            }
        });

Flowable:

/**
         * BackpressureStrategy.ERROR----128
         * BackpressureStrategy.BUFFE----無限制
         * BackpressureStrategy.DROP-----丟棄儲存不了的--128
         * BackpressureStrategy.LATEST---保留最新的-----128
         * 對應的方法:
         * onBackpressureBuffer()
         * onBackpressureDrop()
         * onBackpressureLatest()
         *
         */
        Flowable<String> flowable=Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                emitter.onNext("111111111111");
                emitter.onNext("2222222222222");
                emitter.onNext("3333333333333");
                emitter.onNext("4444444444444");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR);

        Subscriber<String> subscriber=new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.i(TAG,"---onSubscribe---");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,s);
            }

            @Override
            public void onError(Throwable t) {
                Log.i(TAG,"---onError---");
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"---onComplete---");
            }
        };

        flowable.subscribe(subscriber);

執行緒切換:

Observer<String> observer=new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG,"--------onSubscribe------:"+Thread.currentThread().getName());
                compositeDisposable.add(d);
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,"--------onNext------:"+Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"--------onError------");
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"--------onComplete------"+Thread.currentThread().getName());
            }
        };

        Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG,"--------Observable------"+Thread.currentThread().getName());
                emitter.onNext("第一條");
                emitter.onComplete();
            }
        });

        observable.subscribeOn(Schedulers.computation())//1、Schedulers.newThread()2、Schedulers.io()3、Schedulers.computation()
                .observeOn(AndroidSchedulers.mainThread())//android自帶的主執行緒
                .subscribe(observer);

map操作符:

轉換操作符,例如本例子中,將int轉化為String列印

Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "這是第:"+integer+"個指令";
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

flatMap操作符:

這個操作符其實就是再發送一個Observable,舉個簡單的例子,比如註冊後需要登陸就可以用這個操作符

final String[][] name={{"111","1111","11111"},
                {"222","2222","22222"},
                {"333","3333","33333"}};

        Observable.fromArray(name).flatMap(new Function<String[], ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(String[] str) throws Exception {
                return Observable.fromArray(str);
            };
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                Log.i(TAG,(String)o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

filter操作符:

過濾事件,Observer中只處理filter中返回true的事件

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i=0;i<100;i++){
                    emitter.onNext(i);
                }
            }
        }).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer%10==0;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG,"篩選後資料為:"+integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

zip操作符:

組合兩個傳送的事件

Observable<String> observable1=Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                     emitter.onNext("111");
                     emitter.onNext("222");
                     emitter.onNext("333");
                     emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());

        Observable<String> observable2=Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("AAA");
                emitter.onNext("BBB");
                emitter.onNext("CCC");
                emitter.onNext("DDD");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());

        Observable.zip(observable1, observable2, new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                return s+s2;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.i(TAG,"-------complete---------");
            }
        });

其他簡單介紹:

concat:同樣是組合連個傳送observable,但和zip不同,比如傳送事件1,傳送的是1,2,3  傳送事件2,傳送的是a,b,c,則zip的結果是1a,2b,3c ,而concat的結果是1,2,3,a,b,c

concatMap:和flatMap功能相同,但concatMap是有序的,而flatMap則是無序的

distinct:過濾掉相同的事件

timer:表示過多久會執行,只執行一次

interval:定時任務,表示沒過多久就會執行一次

相關推薦

RxJavaRxAndroid用法

RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 J

bowtiebowtie2用法

bowtie 短序列比對工具詳解 常見的短序列比對工具有很多,如fasta、blast、bowtie、shrimp、soap等。每個工具都有其自身的優點,但同時也具備了一些缺點。權衡利弊,我選擇bowtie作為主要的短序列比對工具。它速度很快,比對結果也容易理解。 現在舉個例子來探討bowtie

Python生成器(Generator)yield用法

通過列表生成式,我們可以直接建立一個列表。但是,受到記憶體限制,列表容量肯定是有限的。而且,建立一個包含100萬個元素的列表,不僅佔用很大的儲存空間,如果我們僅僅需要訪問前面幾個元素,那後面絕大多數元素佔用的空間都白白浪費了。 所以,如果列表元素可以按照某種演算法推算出來,那我們是否可以在迴圈的

MultiByteToWideCharWideCharToMultiByte用法

原文連結 注意: 這兩個函式是由Windows提供的轉換函式,不具有通用性 C語言提供的轉換函式為mbstowcs()/wcstombs() 一、函式簡單介紹 涉及到的標頭檔案: 函式所在標頭檔案:windows.h #include <win

【Python】pandas軸旋轉stackunstack用法

摘要 前面給大家分享了pandas做資料合併的兩篇[pandas.merge]和[pandas.cancat]的用法。今天這篇主要講的是pandas的DataFrame的軸旋轉操作,stack和unstack的用法。 首先,要知道以下五點: 1.stack:將資料的列“旋轉”為行 2

traceroutetracert用法

一、什麼是Traceroute?                  Internet,即國際網際網路,是目前世界上最大的計算機網路,更確切地說是網路的網路。它由遍佈全球的幾萬區域網

vsprintf函式va_list用法

void UART1_Printf(char *fmt,...) {  va_list ap;  char string[128];  va_start(ap, fmt);  vsprintf(string, fmt, ap);  UART1_SendString(st

C++中棧佇列用法

1.C++棧用法詳解 堆疊是一個容器的改編,棧是限定僅在表尾進行插入或刪除操作的線性表,因此表尾端成為棧頂,相應的,表頭端成為棧底,不含有任何元素的棧稱為空棧。它實現了一個先進後出的資料結構(FILO

indexOf,lastIndexOfsubstring 用法

indexOf方法: 返回 String 物件內第一次出現子字串的字元位置。 strObj.indexOf(subString[, startIndex]) 引數 strObj  必選項。String 物件或文字。  subString  必選項。要在 String 物件

shell 中 if else 用法

基本語法 shell的if語法和C語言等高階語言非常相似,唯一需要注意的地方就是shell的if語句對空格方面的要求比較嚴格(其實shell對所有語法的空格使用都比較嚴格),如果在需要空格的地方沒有打上空格,都會報錯。如if [ $1x == "ip"x ];then ec

SVN trunk(主線) branch(分支) tag(標記) 用法詳細操作步驟

trac load mar span 必須 最可 objc copy 右鍵 原文地址:http://blog.csdn.net/vbirdbest/article/details/51122637 使用場景: 假如你的項目(這裏指的是手機客戶端項目)的某個版本(例如1.0

oracle中的exists not exists 用法

sdn ref 用法詳解 html nbsp e30 .net tail sin oracle中的exists 和not exists 用法詳解 http://blog.csdn.net/zhiweianran/article/details/7868894oracle

js數組中foEachmap的用法 jq中的$.each$.map

cnblogs arr 對象 cal for index source asc 原生js 數組中foEach和map的用法詳解 相同點: 1.都是循環遍歷數組(僅僅是數組)中的每一項。 2.forEach() 和 map() 裏面每一次執行匿名函數都支持3個參數:數組中的

Django基礎(10): URL重定向的HttpResponseDirect, redirectreverse的用法

detail djang 包含 war sed 模型 博客 nbsp rep 利用django開發web應用, 我們經常需要進行URL重定向,有時候還需要給URL傳遞額外的參數。比如用戶添加文章完成後需要轉到文章列表或某篇文章詳情。因此熟練掌握HttpResponseDir

Tag檔案Tag標記的用法

                                      Tag檔案和Tag標記

SQL中INNER、LEFT、RIGHT JOIN的區別用法

相信很多人在剛開始使用資料庫的INNER JOIN、LEFT JOIN和RIGHT JOIN時,都不太能明確區分和正確使用這三種JOIN操作,本文通過一個簡單的例子通俗易懂的講解這三者的區別,希望對大家能帶來幫助。 首先,我們建立示例資料庫和表。同時也要明確一個概念:A INN

%date~0,4% %time~0,2%等用法

在windows中,有個原始並且功能強大的批處理,好像是被人遺忘了,比如博主最近在一個專案中就用到它,非常好用。今天就和博主一直來看看用批處理生動生成每日的資料夾。 為了能正確地生成每天的日期資料夾,請先將本機時間的短日期格式設定為yyyy-MM-dd。   然後就開始寫bat批處理檔案了,新

Apache ab壓力測試工具Window下載用法

ab是apache自帶的網站壓力測試工具。 使用起來非常的簡單和方便。 不僅僅是可以apache伺服器進行網站訪問壓力測試,還可以對其他型別的伺服器進行壓力測試。 比如nginx,tomcat,IIS等 首先當然是下載安裝了。 在這裡只講window下在下載安裝 官方下載地址:,(htt

js原生之scrollTop、offsetHeightoffsetTop等屬性用法

本文轉載自:https://www.cnblogs.com/koleyang/p/4939853.html **scrollTop、offsetHeight和offsetTop等屬性用法詳解:** 標題中的幾個相關相關屬性在網頁中有這大量的應用,尤其是在運動框架中,但是由於有些屬性相互之間的

react router @4 vue路由 (二)react-router @4用法

  完整版:https://www.cnblogs.com/yangyangxxb/p/10066650.html   2、react-router @4用法   a、大概目錄        不需要像vue那樣麻煩的用到