1. 程式人生 > >RxJava2.0筆記記錄(一)

RxJava2.0筆記記錄(一)

一 基本使用

Observable建立

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
     @Override
     public void subscribe(ObservableEmitter<Integer> e) throws Exception {
         //ObservableEmitter 發射器的意思,用來發出事件的,它可以發出三種類型的事件,通過呼叫emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發出next事件、complete事件和error事件。
} });

Observer建立

Observer<Integer> observer = new Observer<Integer>(){
  @Override
  public void onSubscribe(Disposable d) {
  //開關的意思,當呼叫它的dispose()方法時,它就會切斷事件的接受,導致觀察者接受不到事件。這個可以用作當頁面銷燬時,取消網路請求。
  }

  @Override
  public void onNext(Integer integer) {     
  }

  @Override
  public
void onError(Throwable e) { } @Override public void onComplete() { } };

訂閱被觀察者與觀察者

observable.subscribe(observer);

一個observable可能沒有觀察者訂閱,也可以多個觀察者訂閱,當觀察者和被觀察者建立聯絡後,被觀察者可以通過onNext傳送事件,也可以傳送onComplete標誌事件完成,被觀察者傳送onComplete之後,可以繼續傳送,而觀察者接受到onComplete之後,就不會繼續接收事件。

onComplete和onError必須唯一併且互斥。

subscribe有多個方法的過載,帶有一個引數的Consumer方法,表示下游可以只關心onNext方法。

二 執行緒控制

預設情況下,觀察者和被觀察者工作在主執行緒。通常我們的需求是在子執行緒做耗時操作,然後回到主執行緒中來操作UI。此時我們可以用執行緒控制來達到這一目的。

observable
     .subscribeOn(Schedulers.io())  //subscribeOn被觀察者工作的執行緒
     .observeOn(AndroidSchedulers.mainThread()) //observeOn 觀察者工作的執行緒
     .subscribe(observer);

RxJava內建執行緒選項:
1.Schedulers.io() 代表io操作的執行緒, 通常用於網路,讀寫檔案等io密集型的操作,內部用執行緒池維護。
2.Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
3.Schedulers.newThread() 代表一個常規的新執行緒
4.AndroidSchedulers.mainThread() 代表Android的主執行緒

注意:
1.多次指定被觀察者的執行緒只有第一次指定的有效, 也就是說多次呼叫subscribeOn() 只有第一次的有效, 其餘的會被忽略.
2.多次指定觀察者的執行緒是可以的, 也就是說每呼叫一次observeOn() , 下游的執行緒就會切換一次.

三 操作符

< 1 >.map

map是RxJava中最簡單的一個變換操作符, 它的作用就是對observable傳送的每一個事件應用一個函式, 使得每一個事件都按照指定的函式去變化.

eg:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        return "this is " + integer;
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Consumer<String>() {
      @Override
      public void accept(String s) throws Exception {
          System.out.println(s);
      }
  });

通過Map可以將Observable傳送的事件轉換為任意的型別。

< 2 >. flatmap

FlatMap將一個傳送事件的Observable變換為多個傳送事件的Observables,然後將它們發射的事件合併後放進一個單獨的Observable裡.

eg:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
        }
    }).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            List<String> list = new ArrayList<>();
            for(int i = 0; i<3; i++){
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

注意:flatmap並不能保證事件的順序,如果需要保證事件的順序可以使用concatmap,用法和flatmap一樣。

< 3 >.zip

zip通過一個函式將多個Observable傳送的事件結合到一起,然後傳送這些組合到一起的事件. 它按照嚴格的順序應用這個函式。它只發射與發射資料項最少的那個Observable一樣多的資料。

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    }
}).subscribeOn(Schedulers.io());

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    }
}).subscribeOn(Schedulers.io());

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
    @Override
    public String apply(Integer integer, String s) throws Exception {
        return "";
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {

    }
});

四. 背壓 Backpressure

< 1 >.原理引入

當observable和observer工作在不同執行緒時,它們之間不能直接通訊,當observable傳送一個事件時,是放在一個容器(具體就是一個佇列)裡面的,當容器裡面有事件時,才會向observer傳送事件。而當observable傳送事件太快,observer取事件太慢,容器就會一直放事件,最後會因為容器事件越來越多,最後導致容器OOM了。

這種observable產生事件的速度很快,而observer處理很慢,事件會堆積起來,最終擠爆你的記憶體,導致程式崩潰(MissingBackpressureException異常)。

至此,就可以引出背壓(Backpressure)的概念:背壓是指在非同步場景中,被觀察者傳送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低傳送速度的策略。

關於背壓,可以看這裡

< 2 >.解決方案

a. 使用filter過濾掉不需要的資料
b. 使用sample每隔一段時間從observable取出事件,傳送給下游。
c. 延時降低observable傳送事件的速度。
d.使用flowable解決。

大致就是從數量速度上解決。

五.Flowable

這裡把被觀察者指定為flowable,觀察者指定為subscriber。

eg:

Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> e) throws Exception {        
        }
        //流速不均勻直接拋異常
    }, BackpressureStrategy.ERROR);

    Subscriber<Integer> subscriber = new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {         
        }

        @Override
        public void onNext(Integer integer) {
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onComplete() {
        }
    };
    flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

和observable不同的是,這裡增加了一個引數,用來處理上下游流速不均衡的時候應該怎麼處理的辦法,在例子中直接丟擲了一個BackpressureStrategy.ERROR異常,也就是MissingBackpressureException異常。同時觀察者中不是再使用Disposable,而是使用Subscription,與Disposable不同的是,這種採用的是響應式拉取,當在onSubscribe方法中呼叫了request方法時,observable會根據request的事件數量發相應數量的事件。observable不會再一直髮事件。如果不呼叫request方法,在同一執行緒,就會拋MissingBackpressureException異常,在不同執行緒,observable就會先把事件放到一個容器,大小為128。當observer呼叫request才會傳送事件。

參考

相關推薦

RxJava2.0筆記記錄

一 基本使用 Observable建立 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

Activiti6.0踩坑記錄-用admin登入後臺配置終端提示“Endpoint configuration for 'Activiti app' is invalid”

最近主管需要我搭建一個基於Activiti6.0引擎的工作流平臺,在配置好Tomcat併成功執行Activiti6.0官網所提供的war包後,在平臺上建立了一個二級審批流程,整個流程結束以後,需要用admin賬號去activiti-admin管理平臺檢視這個流程審批情況。 在登入介面輸入 使用者

最簡單易懂的RxJava2.0學習教程之RxJava2的基本使用

去年RxJava2就釋出了,在這之後一直做專案都在使用Rxjava2,覺得特別好用,目前網上的的資料很多,對於以前使用過RxJava1的朋友來說只需要看看更新文件就知道怎麼使用了,但還有一些以前沒用過RxJava的朋友可能就不知道怎麼辦了,不知道該看RxJava

Activiti6.0踩坑記錄-用admin登入後臺配置終端提示“Endpoint configuration for 'Activiti app' is invalid”

最近主管需要我搭建一個基於Activiti6.0引擎的工作流平臺,在配置好Tomcat併成功執行Activiti6.0官網所提供的war包後,在平臺上建立了一個二級審批流程,整個流程結束以後,需要用admin賬號去activiti-admin管理平臺檢視這個流程審批情況。

Linux 環境部署記錄

sql tables 查看 ont 系統時間 earch arch borde 路徑 時間設置 查看系統當前日期/時間: date -R 查看系統硬件時鐘: hwclock --show 設置硬件時間: hwclock --set --date="07/18/

vue2.0單元測試

.com str images alt 需求 org 封裝 min 測試 1.在vue init webpack XXX創建項目的時候 最後2步選擇YES就啟動了vue單元測試開始了 2.測試是使用karma+mocha框架來實現的方法,安裝虛擬瀏覽器模塊Phanto

android的入門記錄

下載到本地 可能 安裝 subst 數據 工具包 一次 以及 由於 ---恢復內容開始--- 首先,這是我人生中的第一篇博客,也許嚴格意義上它並不算是一篇博客,但也代表著一些東西。 前言 我們往往在開始學習一門新的語言或者課程時會遇見各式各樣的問題,比

linux學習記錄

設備 ima 權限 logs spf style www. 塊設備 alt 1、各種顏色文件的含義 黃色表示設備文件 灰色表示其它文件 白色表示普通文件 綠色表示可執行文件; 紅色表示壓縮文件; 淺藍色表示鏈接文件; 灰色表示其它文件; 紅色閃爍表示鏈接的文件有問題了;

給初學者的RxJava2.0教程(二)

pos 添加 tex 工作 -i com ava gis ket 前言 上一節教程講解了最基本的RxJava2的使用, 在本節中, 我們將學習RxJava強大的線程控制. 正題 還是以之前的例子, 兩根水管: RxJava 正常情況下, 上遊和下遊是工作在同一個

給初學者的RxJava2.0教程(三)

app 創建 roi 情況下 ids table 因此 next 上下 前言 上一節教程講解了最基本的RxJava2的使用, 在本節中, 我們將學習RxJava強大的線程控制. 正題 還是以之前的例子, 兩根水管: RxJava 正常情況下, 上遊和下遊是工作在

給初學者的RxJava2.0教程(五)

roi 直接 ror 兩個 defined create 了解 作者 一點 前言 大家喜聞樂見的Backpressure來啦. 這一節中我們將來學習Backpressure. 我看好多吃瓜群眾早已坐不住了, 別急, 我們先來回顧一下上一節講的Zip. 正題 上一節中我們說

vue學習記錄—— vue開發調試神器vue-devtools安裝

shell gist 項目 擴展工具 code blog manifest false .net 網上有些貼子少了至關重要的一步導致我一直沒裝上, 切記!!install後還需build,且install和build都在vue-devtools文件夾內執行 github下載

Hive筆記整理

大數據 Hive [TOC] Hive筆記整理(一) Hive Hive由facebook貢獻給Apache,是一款建立在Hadoop之上的數據倉庫的基礎框架。 數據倉庫 特點——關於存放在數據倉庫中的數據的說明: 是能夠為企業的各個級別的決策提供數據支撐的數據 其實說白了,就是一個存放數據

C語言程序編寫中犯的錯誤的記錄

C 程序編寫 錯誤 今天學習用到了《C程序設計(第四版)》的求兩個數的最大值的程序devcpp程序:#include <stdio.h>int main(){int max(int x,int y);int a,b,c;scanf("%d,%d",&a,&a

HBase筆記整理

大數據 HBase [TOC] HBase筆記整理(一) 行列式數據庫 行式數據庫: 可以簡單的理解為類似傳統的rdbmspaint這些數據,存放的數據都是結構化的數據。 行式數據庫,是有利於全表數據的掃描,不利於只查詢個別字段 列式數據庫: 對行式數據庫的一個改進,將部分列(或者說有關聯的一些列)

Kafka筆記整理

大數據 消息隊列 消息訂閱系統 Kafka [TOC] Kafka筆記整理(一) Kafka簡介 消息隊列(Message Queue) 消息 Message 網絡中的兩臺計算機或者兩個通訊設備之間傳遞的數據。例如說:文本、音樂、視頻等內容。 隊列 Queue 一種特殊的線性表(

Redis筆記整理:Redis安裝配置與數據類型操作

數據庫 NoSQL Redis [TOC] Redis筆記整理(一):Redis安裝配置與數據類型操作 Redis簡介 Redis是一個開源(BSD許可),內存存儲的數據結構服務器,可用作數據庫,高速緩存和消息隊列代理。 它支持字符串、哈希表、列表、集合、有序集合,位圖,hyperloglo

ElasticSearch筆記整理:簡介、REST與安裝配置

大數據 ElasticSearch ELK [TOC] ElasticSearch簡介 ElasticSearch是一款基於Apache Lucene構建的開源搜索引擎,它采用Java編寫並使用Lucene構建索引、提供搜索功能,ElasticSearch的目標是讓全文搜索變得簡單,開發者可以通

Storm筆記整理:簡介與設計思想

大數據 實時計算 Storm [TOC] 實時計算概述 有別於傳統的離線批處理操作(對很多數據的集合進行的操作),實時處理,說白就是針對一條一條的數據/記錄進行操作,所有的這些操作進行一個匯總(截止到目前為止的所有的統計總和)。 實時計算與離線計算比較 Bounded:有界 離線計算面臨

Scala筆記整理:scala基本知識

大數據 Scala [TOC] Scala簡介 Scala是一門多範式(multi-paradigm)的編程語言,設計初衷是要集成面向對象編程和函數式編程的各種特性。 Scala運行在Java虛擬機上,並兼容現有的Java程序。 Scala源代碼被編譯成Java字節碼,所以它可以運行於JVM之上,並