1. 程式人生 > >RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering

RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering

RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering

一個Observable的生產速度可能比Observer的消費速度快。併發的時候,Observable鏈的不同operators執行在不同的Schedulers上。慢操作的後面會排隊,產生瓶頸。
可以使用Flowable處理瓶頸。但是,不是每個源都可以被backpressured。你不能讓Observable.interval()(甚至Flowable.interval())變慢,因為這些emissions是時間敏感的。使用者的輸入事件,比如點選按鈕,也是不能被backpressured。
幸好,下面介紹的這些operators,可以不使用backpressure應付快速產生的源,或者無法使用backpressure的時候。

Buffering

buffer()收集emissions,批量發射(一個list或者其他集合型別)。可以是固定的buffer,也可以是一個時間視窗,甚至被另一個Observable分割。

Fixed-size buffering

這個簡單的例子,buffer()接受count引數,以固定尺寸,成批發射:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 50)
                .
buffer(8) .subscribe(System.out::println); } }

輸出是

[1, 2, 3, 4, 5, 6, 7, 8]
[9, 10, 11, 12, 13, 14, 15, 16]
[17, 18, 19, 20, 21, 22, 23, 24]
[25, 26, 27, 28, 29, 30, 31, 32]
[33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48]
[49, 50]

也可以提供第二個引數bufferSupplier,把元素放到一個自定義的集合,比如HashSet:

import io.reactivex.Observable;
import java.util.HashSet;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 50)
                .buffer(8, HashSet::new)
                .subscribe(System.out::println);
    }
}

還可以提供一個skip引數,指定開始一個新的buffer前,要跳過多少元素。如果skip等於count,skip沒有影響。
如果不相等,事情變得很有趣。比如,你buffer兩個emissions,但是跳過三個:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 10)
                .buffer(2, 3)
                .subscribe(System.out::println);
    }
}

輸出是

[1, 2]
[4, 5]
[7, 8]
[10]

如果skip小於count,就得到一個有趣的rolling buffers。比如buffer的大小是3,skip是1:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 10)
                .buffer(3, 1)
                .subscribe(System.out::println);
    }
}

輸出是

[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]
[8, 9, 10]
[9, 10]
[10]

下面的例子,使用buffer(2, 1)發射,然後使用filter()過濾掉最後一個:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 10)
                .buffer(2, 1)
                .filter(c -> c.size() == 2)
                .subscribe(System.out::println);
    }
}

Time-based buffering

buffer()也可以使用固定的時間間隔。下面的程式碼,源每300毫秒發射,每個緩衝的list包含3個或者4個元素:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 300) // map to elapsed time
                .buffer(1, TimeUnit.SECONDS)
                .subscribe(System.out::println);
        sleep(4000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

也有可選的timeskip引數,它控制每個buffer開始的時間。
還可以提供第三個引數count,控制buffer的最大size。無論到時間了,還是buffer滿了,都會導致buffer發射。如果時間視窗關閉之前,buffer滿了,會發射空的buffer:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 300) // map to elapsed time
                .buffer(1, TimeUnit.SECONDS, 2)
                .subscribe(System.out::println);
        sleep(5000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出是

[300, 600]
[900]
[1200, 1500]
[1800]
[2100, 2400]
[2700]
[3000, 3300]
[3600, 3900]
[]
[4200, 4500]
[4800]

基於時間的buffer(),使用computation Scheduler。

Boundary-based buffering

更強大的buffer()變種是接受另一個Observable作為boundary引數。其他Observable發射的型別不重要。重要的是,每當它發射,就開始另一個buffer。
比如前面的例子,我們使用每秒的Observable.interval()作為每300毫秒的Observable.interval()的邊界:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Long> cutOffs =
                Observable.interval(1, TimeUnit.SECONDS);
        Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 300) // map to elapsed time
                .buffer(cutOffs)
                .subscribe(System.out::println);
        sleep(5000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出是

[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]
[4200, 4500, 4800]

Windowing

window()和buffer()幾乎一樣,只是它把元素緩衝進其他Observables,而不是集合。它返回一個Observable<Observable>。每個Observable emission會快取emissions,訂閱以後flush他們(很像GroupedObservable)。這樣,emissions有效了就發射,而不是list滿了才發射。
和buffer()一樣,每個批也可以是固定大小的,時間間隔的,或者來自另一個Observable。

Fixed-size windowing

我們修改以前的例子,使用window()把50個整數緩衝進長度為8的list。我可以響應式地轉換每個批次成為一個非集合,比如使用“|”級聯的字串:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 50)
                .window(8)
                .flatMapSingle(obs -> obs.reduce("", (total, next) ->
                        total + (total.equals("") ? "" : "|") + next))
                .subscribe(System.out::println);
    }
}

就像buffer(),可以提供skip引數,表示開始一個新視窗前,需要跳過多少emissions。下面的例子,視窗大小是2,跳過3個元素。然後接受每個視窗Observable,reduce成級聯的字串:

import io.reactivex.Observable;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 50)
                .window(2, 3)
                .flatMapSingle(obs -> obs.reduce("", (total, next) ->
                        total + (total.equals("") ? "" : "|") + next))
                .subscribe(System.out::println);
    }
}

Time-based windowing

下面,有一個每300毫秒發射的Observable,每一秒切成一個分離的Observables,然後字串級聯:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 300) // map to elapsed time
                .window(1, TimeUnit.SECONDS)
                .flatMapSingle(obs -> obs.reduce("", (total, next) ->
                        total + (total.equals("") ? "" : "|") + next))
                .subscribe(System.out::println);
        sleep(5000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Boundary-based windowing

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Long> cutOffs =
                Observable.interval(1, TimeUnit.SECONDS);
        Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 300) // map to elapsed time
                .window(cutOffs)
                .flatMapSingle(obs -> obs.reduce("", (total, next) ->
                        total + (total.equals("") ? "" : "|") + next))
                .subscribe(System.out::println);
        sleep(5000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Throttling

buffer()和window()把emissions分批放進集合或者Observables,這樣做一般是合併而部署忽略emissions。而throttle(),emissions發生得太快時,就忽略一部分。當假設快速的emissions是冗餘的或者不想要的時候,還是很有用的,比如重複點選button。
下面的例子,我們有三個Observable.interval()源,分別以100毫秒、300毫秒和2000毫秒的間隔發射。我們只接受第一個源的前十個元素,第二個的三個,第三個的兩個元素。我們使用Observable.concat()把他們連到一起:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<String> source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 100) // map to elapsed time
                .map(i -> "SOURCE 1: " + i)
                .take(10);
        Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 300) // map to elapsed time
                .map(i -> "SOURCE 2: " + i)
                .take(3);
        Observable<String> source3 = Observable.interval(2000, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 2000) // map to elapsed time
                .map(i -> "SOURCE 3: " + i)
                .take(2);
        Observable.concat(source1, source2, source3)
                .subscribe(System.out::println);
        sleep(6000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

接下來,我們使用throttle(),只選擇其中一部分,忽略其他的。

throttleLast() / sample()

throttleLast()(它的別名是sample())只發射每個固定時間間隔之內的最後的元素。

Observable.concat(source1, source2, source3)
        .throttleLast(1, TimeUnit.SECONDS)
        .subscribe(System.out::println);

輸出是

SOURCE 1: 900
SOURCE 2: 900
SOURCE 3: 2000

可以看到,發射了每個一秒的最後一個emission。
如果想以更大的時間間隔節流,你會得到更少的emissions:

Observable.concat(source1, source2, source3)
        .throttleLast(2, TimeUnit.SECONDS)
        .subscribe(System.out::println);

輸出是

SOURCE 2: 900
SOURCE 3: 2000

如果減少時間間隔,會得到更多emissions:

Observable.concat(source1, source2, source3)
        .throttleLast(500, TimeUnit.MILLISECONDS)
        .subscribe(System.out::println);

輸出是

SOURCE 1: 400
SOURCE 1: 900
SOURCE 2: 300
SOURCE 2: 900
SOURCE 3: 2000

throttleFirst()

throttleFirst()只發射固定時間間隔的第一個元素:

Observable.concat(source1, source2, source3)
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(System.out::println);

輸出是

SOURCE 1: 100
SOURCE 2: 300
SOURCE 3: 2000
SOURCE 3: 4000

throttleFirst()和throttleLast()使用computation Scheduler,你可以使用第三個引數指定自己的Scheduler。

throttleWithTimeout() / debounce()

當快速發射的時候,throttleWithTimout()不發射任何東西,出現一個“沉默期”以後,就發射最後一個emission。
它接受時間間隔引數,表示多長時間沒收到emissions,就發射最後一個emission。比如前面的例子,我們的沉默期是一秒:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<String> source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 100) // map to elapsed time
                .map(i -> "SOURCE 1: " + i)
                .take(10);
        Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 300) // map to elapsed time
                .map(i -> "SOURCE 2: " + i)
                .take(3);
        Observable<String> source3 = Observable.interval(2000, TimeUnit.MILLISECONDS)
                .map(i -> (i + 1) * 2000
            
           

相關推薦

RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering

RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering Buffering Fixed-size buffering Time-based buffering Boun

RxJava學習 - 5. Single, Completable, and Maybe

RxJava學習 - 5. Single, Completable, and Maybe Single Maybe Completable Single Single實際上只發射一次。它有自己的SingleObserver介面: i

HTML學習筆記-網上書城案例 AND 2018-11-14(22:14)

小夥伴們,又見啦! 一、HTML 網上書城案例 1.頁頭 實現程式碼: <div> <table> <tr> <td><img src="圖片路徑" height="50" width="100" /><

RxJava學習 - 13. Transformers and Custom Operators

RxJava學習 - 13. Transformers and Custom Operators Transformers ObservableTransformer FlowableTransformer Avoiding shared

RxJava學習 - 12. Flowables and Backpressure

RxJava學習 - 12. Flowables and Backpressure Understanding backpressure An example that needs backpressure Introducing the Flo

RxJava學習 - 10. Concurrency and Parallelization

RxJava學習 - 10. Concurrency and Parallelization Introducing RxJava concurrency Keeping an application alive Understanding

RxJava學習 - 9. Multicasting, Replaying, and Caching

RxJava學習 - 9. Multicasting, Replaying, and Caching Understanding multicasting Multicasting with operators When to multicast

【Spark深入學習-11】Spark基本概念和運行模式

nmf 磁盤 大數據平臺 並不是 鼠標 .cn 管理系統 大型數據集 spa ----本節內容------- 1.大數據基礎 1.1大數據平臺基本框架 1.2學習大數據的基礎 1.3學習Spark的Hadoop基礎 2.Hadoop生態基本介紹 2.1

【ES】學習11-多桶排序

nbsp order key actions color 字符串 efi 結果 literal 聚合結果的排序 默認:桶會根據 doc_count 降序排列。 內置排序: 設置按doc_count升序排序:註意order,_count GET /cars/transa

學習11

npe 接受 問題 重要 心情大好 作品 收藏 對比 推薦 7月1日,新聞,小兒推拿培訓。 柔小兒些,雅麗對著鏡頭道。,善解小兒意培訓女孩子。而且還濟南學習斷培訓增加。而且李嫂學習推拿可以每天都來看望嗎。而且練霓裳培訓性格,這個月必須給小緣發獎金。 ,培訓全版權。這樣成

struts2學習(11)struts2驗證框架1.驗證簡介、內置驗證

oid -- 技術分享 ucc view 1.0 style text field 一、Struts2驗證簡介: 二、struts2內置驗證: 下面例子,需求是:為用戶註冊進行驗證; com.cy.model.User.java: package com.cy

Spark機器學習(11):協同過濾算法

設置 tel println print emp master ani alt tro 協同過濾(Collaborative Filtering,CF)算法是一種常用的推薦算法,它的思想就是找出相似的用戶或產品,向用戶推薦相似的物品,或者把物品推薦給相似的用戶。怎樣評價用戶

最權威的RXJaVa學習資料

學習 android music roi androi andro java學習 oid com aNDROID%E8%AF%BB%E5%86%99%E6%96%87%E4%BB%B6%E7%9A%84N%E7%A7%8D%E5%86%99%E6%B3%95 http:/

前端學習11.14

mbed out tac asc 蘋果公司 局限 com menu etime 轉載自:http://www.cnblogs.com/best/p/6096476.html#_lab2_2_0 1、新增加其它元素 1.1、meter 表示特定範圍內的數值,可用於工資、數量、

Nginx服務學習(11)-應用場景

code vpd ces oca eve 壓縮 pro off 傳輸 Nginx服務 Nginx靜態資源web服務 Nginx代理服務 Nginx負載均衡調度 Nginx緩存 CDN分發網絡 模塊配置 Syntax:sendfile on | off; ##文件

ROS學習 Writing a Simple Publisher and Subscriber & Examining them

Go got pub sco targe ide 代碼 int pie 本文主要部分全部來源於ROS官網的Tutorials. 創建Publisher Node roscd beginner_tutorials mkdir -p src gedit src/talker.

python 的基礎 學習 11天 作業題

blog 結果 字母 log 知識 修改 參數 OS 作業 1、整理函數相關知識點,寫博客 2、寫函數,檢查獲取傳入列表或元組對象的所有奇數位索引對應的元素,並將其作為新列表返回給調用者。 3、寫函數,判斷用戶傳入的對象(字符串、列表、元組)長度是否大於5。 4、寫函數,檢

python學習11——if 語句。

python學習 wake 17. dream people som bubuko ant tab people=int(input("How many people are there in this city?")) cats = int(input("How m

python基礎學習11----函數

enc 局部作用域 語句 基礎 變量 each 不可變 不可 lis 一.函數的定義 def 函數名(參數列表): 函數體 return語句 return語句不寫或後邊不加任何對象即為return None 二.函數的參數 無參數 def func1

Python學習小記(2)---[list, iterator, and, or, zip, dict.keys]

1.List行為 可以用 alist[:] 相當於 alist.copy() ,可以建立一個 alist 的 shallo copy,但是直接對 alist[:] 操作卻會直接操作 alist 物件