1. 程式人生 > >RxJava學習 - 12. Flowables and Backpressure

RxJava學習 - 12. Flowables and Backpressure

RxJava學習 - 12. Flowables and Backpressure

當生產更快的時候,最好的辦法是讓源變慢,以下游能接受的速度發射。這叫backpressure或者流量控制,可以使用Flowable代替Observable。

Understanding backpressure

Observables的本質是基於push的。元素同步地,一次一個地到達Observer。預設不使用併發。
比如,下面的Observable發射1-999999999。它把每個整數對映到MyItem例項——它只有一個屬性。讓我們放慢速度,Observer每50毫秒處理一個元素。可以看出,如果下游處理速度慢,上游也同步地變慢。因為所有工作都是一個執行緒完成的:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher
{ public static void main(String[] args) { Observable.range(1, 999_999_999) .map(MyItem::new) .subscribe(myItem -> { sleep(50); System.out.println("Received MyItem " + myItem.id); }); } public static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } static final class MyItem { final int id; MyItem(int id) { this.id = id; System.out.println("Constructing MyItem " + id); } } }

An example that needs backpressure

當Observable鏈中有了併發操作,操作變成了非同步的。這意味著在某個時刻,Observable鏈上的多個部分可以都在處理emissions,生產者可以超過消費者,因為他們在不同的執行緒上工作。不再嚴格地一次一個地把emission交給下游。因此,源可能在前一個emission還沒到達Observer就傳送下一個。
前面的例子,我們在subscribe()之前,加上observeOn(Shedulers.io()):

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.range(1, 999_999_999)
                .map(MyItem::new)
                .observeOn(Schedulers.io())
                .subscribe(myItem -> {
                    sleep(50);
                    System.out.println("Received MyItem " + myItem.id);
                });
        sleep(Long.MAX_VALUE);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    static final class MyItem {
        final int id;
        MyItem(int id) {
            this.id = id;
            System.out.println("Constructing MyItem " + id);
        }
    }    
}

我的機器上,當增加了1001902個MyItem,Observer才處理了38個。來不及處理的被放到observeOn()的佇列,會導致很多問題,甚至OutOfMemoryError異常。

Introducing the Flowable

Flowable是Observable的背壓變種,告訴源以下游的節奏發射。
下面的程式碼,使用了Flowable.range(),整個鏈就使用Flowables工作了:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Flowable.range(1, 999_999_999)
                .map(MyItem::new)
                .observeOn(Schedulers.io())
                .subscribe(myItem -> {
                    sleep(50);
                    System.out.println("Received MyItem " + myItem.id);
                });
        sleep(Long.MAX_VALUE);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    static final class MyItem {
        final int id;
        MyItem(int id) {
            this.id = id;
            System.out.println("Constructing MyItem " + id);
        }
    }    
}

輸出是

Constructing MyItem 1
Constructing MyItem 2
Constructing MyItem 3
......
Constructing MyItem 127
Constructing MyItem 128
Received MyItem 1
Received MyItem 2
Received MyItem 3
......
Received MyItem 95
Received MyItem 96
Constructing MyItem 129
Constructing MyItem 130
Constructing MyItem 131...
Constructing MyItem 223
Constructing MyItem 224
Received MyItem 97
Received MyItem 98
Received MyItem 99
......

源開始發射了128個emissions,然後,Flowable鏈一次處理了96個emissions的流。就像整個Flowable鏈努力保持它的管道內不超過96個元素一樣。
Flowable.range()為什麼發射了128個emissions,為什麼observeOn()在請求另一個96個元素前,向下遊發射了96個元素,而留下32個未處理的元素?emissions的最初批量會大一些,所以,如果有空閒時間,有些工作會排隊。如果Flowable以請求96個元素開始,接著每次發射96個元素,會有等待下一個96個元素的時刻。因此,維護一個額外的32個長度的rolling cache,讓空閒時刻有事情做。

When to use Flowables and backpressure

使用Flowable,減少了記憶體的使用,防止OutOfMemoryError異常。有些源不支援背壓,如果使用背壓,會導致MissingBackpressureException。
使用Flowable,會損失一部分效能。

如果Observable生存期內,發射的元素超與1000個,或者並不頻繁,可以使用Observable。
如果你的操作是嚴格同步的,或者有限地使用了併發,可以使用Observable。包括在Observable鏈的開始,簡單地使用subscribeOn(),因為這個處理還是使用單執行緒,元素被同步發給下游。當你在不同的執行緒上開始zipping和combining不同的流,或者使用了observeOn()、interval()和delay(),程式不再同步,應該使用Flowable。

Understanding the Flowable and Subscriber

Flowable.interval(),以固定的時間間隔發射。它是背壓的嗎?它的每個emission都是時間敏感的。如果我們放慢Flowable.interval(),
我們的emissions將不再反映時間間隔。因此,當下遊請求背壓時,Flowable.interval()會拋MissingBackpressureException:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Flowable.interval(1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.io())
                .map(i -> intenseCalculation(i))
                .subscribe(System.out::println, Throwable::printStackTrace);
        sleep(Long.MAX_VALUE);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }     
}

輸出是

io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
	at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:96)
	at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38)

可以使用onBackpresureDrop()或者onBackPressureBuffer(),我們以後再講。

The Subscriber

Flowable使用Subscriber消費emissions和事件。如果你傳lambda事件引數(不是整個Subscriber物件),subscribe()不返回Disposable,而是返回Subscription,可以使用它的cancel()方法處置。
使用Subscription的request()方法,可以知道上游想傳送多少元素。
增加Subscriber的最快的辦法是給subscribe()傳lambda引數。Subscriber的預設實現會請求一個不限數量的emissions上游,之前的任何操作都自動處理背壓:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Flowable.range(1,1000)
                .doOnNext(s -> System.out.println("Source pushed " + s))
                .observeOn(Schedulers.io())
                .map(i -> intenseCalculation(i))
                .subscribe(s -> System.out.println("Subscriber received " + s),
                        Throwable::printStackTrace,
                        () -> System.out.println("Done!")
                );
        sleep(20000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }     
}

當然,你可以自己實現Subscriber,它有onNext()、onError()和onComplete()方法。不像實現Observer那麼簡單,因為實現onSubscribe方法的時候,需要在Subscription上呼叫request(),在正確的時候請求emissions。
實現Subscriber的最快、最容易的辦法是使用request(Long.MAX_VALUE),意思是告訴上游“現在可以給我任何元素”。甚至Subscriber前面的操作也會在他們自己的背壓速度下請求emissions。最後一個operator和Subscriber之間不會背壓。這一般沒問題,因為上游已經限制了速度。
現在,我們實現自己的Subscriber:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        Flowable.range(1, 1000)
                .doOnNext(s -> System.out.println("Source pushed " + s))
                .observeOn(Schedulers.io())
                .map(i -> intenseCalculation(i))
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        subscription.request(Long.MAX_VALUE);
                    }
                    @Override
                    public void onNext(Integer s) {
                        sleep(50);
                        System.out.println("Subscriber received " + s);
                    }
                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });
        sleep(20000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }     
}

如果你想讓你的Subscriber和它前面的操作建立明確的背壓關係,可以在呼叫request()的時候傳遞一定的引數。比如,你決定,你的Subscriber初始請求40個emissions,然後每次請求20個:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import java.util.concurrent.ThreadLocalRandom;

public class Launcher {
    public static void main(String[] args) {
        Flowable.range(1, 1000)
                .doOnNext(s -> System.out.println("Source pushed " + s))
                .observeOn(Schedulers.io())
                .map(i -> intenseCalculation(i))
                .subscribe(new Subscriber<Integer>() {
                    Subscription subscription;
                    AtomicInteger count = new AtomicInteger(0);
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        this.subscription = subscription;
                        System.out.println("Requesting 40 items!");
                        subscription.request(40);
                    }
                    @Override
                    public void onNext(Integer s) {
                        sleep(50);
                        System.out.println("Subscriber received " + s);
                        if (count.incrementAndGet() % 20 == 0 && count.get() >= 40)System.out.println("Requesting 20 more!");
                        subscription.request(20);
                    }
                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });
        sleep(20000);
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }     
}

注意,源仍然初始發射128個元素,然後仍然每次發射96個。但是,我們的Subscriber只接收40個,然後每次20個。我們的Subscriber的request()呼叫只和它的上游(map())溝通。map()可能把請求傳給observeOn(),observeOn()快取元素,只刷出40個和20個。當它的快取少了,或者空了,就再請求96個元素。

Creating a Flowable

可以使用Observable.create()增加Observable,下面的程式碼描述了當它被訂閱時,怎麼發射元素:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Integer> source = Observable.create(emitter -> {
            for (int i = 0; i <= 1000; i++) {
                if (emitter.isDisposed())
                    return;
                emitter.onNext(i);
            }
            emitter.onComplete();
        });
        source.observeOn(Schedulers.io())
                .subscribe(System.out::println);
        sleep(1000);        
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上面的Observable.create()發射0-1000然後呼叫onComplete()。如果呼叫subscribe()返回的Disposable的dispose()方法,它會突然停止(在for迴圈裡做檢查)。
如果我們使用Flowable.create(),在背壓的情況下,會怎樣呢?像上面那樣簡單地使用for迴圈,不會停止發射。

Using Flowable.create() and BackpressureStrategy

使用Flowable.create(),你必須指定第二個引數:BackpressureStrategy。這個列舉型