1. 程式人生 > >RxJava學習 - 10. Concurrency and Parallelization

RxJava學習 - 10. Concurrency and Parallelization

RxJava學習 - 10. Concurrency and Parallelization

10年來,併發需求增長迅速,已經是Java開發者的必要技能。Concurrency(也叫多執行緒)實質上是多工,需要同一時間執行幾個處理。如果你想充分利用硬體的運算能力(電話、伺服器、膝上型電腦或者桌面計算機),需要學習怎樣才能使用多執行緒和併發。RxJava讓併發更容易更安全。

Introducing RxJava concurrency

預設地,Observables在及時(immediate)執行緒(就是宣告Observer的執行緒)上執行工作。前面的很多例子,就是啟動main()方法的執行緒。
但是也有一些例子,不是所有的Observables都在immediate執行緒上執行。想想那些使用Observable.interval()的程式碼:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args)
{ Observable.interval(1, TimeUnit.SECONDS) .map(i -> i + " Mississippi") .subscribe(System.out::println); sleep(5000); } public static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }

上面的Observable實際上在另一個執行緒上執行。main執行緒啟動Observable.interval(),但是不會等它完成,因為它用了另一個執行緒,程式現在包含兩個執行緒。如果main執行緒呼叫sleep()方法,main()方法結束,程式退出前,有機會發射。
通常,只有在長時間執行或者計算密集的處理才需要併發。下面的例子,增加一個叫做intenseCalculation()的方法,模擬一個長時間執行的程式。它會簡單地接受任何值,然後sleep0-3秒,然後返回相同的值。讓執行緒休眠,或者暫停它 ,可以模仿繁忙工作的執行緒:

public static <T> T intenseCalculation(T value) {
    sleep(ThreadLocalRandom.current().nextInt(3000));
    return value;
}

讓我們增加兩個Observables,再增加兩個Observers訂閱他們。每個map方法呼叫intenseCalculation()方法讓他們變慢:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .map(s -> intenseCalculation((s)))
                .subscribe(System.out::println);
        Observable.range(1,6)
                .map(s -> intenseCalculation((s)))
                .subscribe(System.out::println);
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }
    
    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出是

Alpha
Beta
Gamma
Delta
Epsilon
1
2
3
4
5
6

可以看到,Observables發射得很慢。而且,第一個發射完,第二個才開始發射。如果兩個能同時發射,而不是一個等另一個結束,就可以執行得快一些。
我們可以使用subscribeOn()實現,它可以讓源在特定的Scheduler上發射。讓我們使用Schedulers.computation(),它會使用固定數量的執行緒池。
它為每個Observer提供一個執行緒發射emissions。呼叫了onComplete(),該執行緒就被Scheduler回收,等待重用:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .subscribeOn(Schedulers.computation())
                .map(s -> intenseCalculation((s)))
                .subscribe(System.out::println);
        Observable.range(1,6)
                .subscribeOn(Schedulers.computation())
                .map(s -> intenseCalculation((s)))
                .subscribe(System.out::println);
        sleep(20000);
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }
    
    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

RxJava的operators能安全地在不同的執行緒上工作。每個組合多個Observables的operators和工廠,比如merge()個zip(),能安全地組合不同的執行緒上發射的emissions:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<String> source1 =
                Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                        .subscribeOn(Schedulers.computation())
                        .map(s -> intenseCalculation((s)));
        Observable<Integer> source2 =
                Observable.range(1, 6)
                        .subscribeOn(Schedulers.computation())
                        .map(s -> intenseCalculation((s)));
        Observable.zip(source1, source2, (s, i) -> s + "-" + i)
                .subscribe(System.out::println);
        sleep(20000);
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }
    
    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Keeping an application alive

之前,我們使用sleep()方法防止併發的響應式程式過早退出。如果你使用Android、JavaFX或其他擁有自己的non-daemon執行緒的框架,這不是一個問題,因為程式會保持執行狀態。但是,如果簡單地使用main()方法,想啟動長時間執行或者無限Observables的程式,你不得不長時間保持main執行緒活著。
一個辦法是讓執行緒sleep下去:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.interval(1, TimeUnit.SECONDS)
                .map(l -> intenseCalculation((l)))
                .subscribe(System.out::println);
        sleep(Long.MAX_VALUE);
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }
    
    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

另一個辦法是,Brian Goetz在Java Concurrency in Practice書裡討論過的,使用CountDownLatch等待訂閱完成。RxJava有一個容易的辦法是使用blocking operators,它可以用來停止宣告的執行緒和等待emissions。通常,它用於單元測試,在生產中,如果使用不當,會引起antipatterns。使用blocking operator,可以讓包含有限的Observable的程式長期存活。看下面的例子,使用blockingSubscribe()代替subscribe(),會在程式退出之前,等待呼叫onComplete():

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .subscribeOn(Schedulers.computation())
                .map(s -> intenseCalculation((s)))
                .blockingSubscribe(System.out::println,
                        Throwable::printStackTrace,
                        () -> System.out.println("Done!"));
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000));
        return value;
    }
    
    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Understanding Schedulers

一些執行緒池有固定數量的執行緒,可以使用ExecutorService返回一個執行緒池。RxJava有它自己的併發抽象,叫Scheduler。像ExecutorService或者actor系統那樣,它定義方法和規則。
在Schedulers靜態工廠類裡可以找到很多預設的Scheduler實現。對於一個給定的Observer,一個Scheduler會從執行緒池提供一個執行緒,用來發射emissions。
呼叫onComplete(),operation被處置,執行緒被回收。

Computation

Schedulers.computation(),會維護一個固定數量(根據處理器數量)的執行緒池。計算任務(比如數學、演算法和複雜邏輯)可以充分利用核心。
如果你不確信有會併發執行多少任務,或者不確信應該使用哪個Scheduler,最好使用預設的那個。

IO

讀寫資料庫、web請求、磁碟儲存那樣的IO任務較少使用CPU,經常有等待資料被髮送或者返回的空閒時間。此時應該使用Schedulers.io()。
它會維護的執行緒數和任務數一樣多,並且根據需要的數量動態增長、快取和減少。比如,你可以用Schedulers.io()執行使用RxJava-JDBC
執行SQL的操作:

Database db = Database.from(conn);
Observable<String> customerNames =
    db.select("SELECT NAME FROM CUSTOMER")
            .getAs(String.class)
            .subscribeOn(Schedulers.io());

但是你要小心,根據經驗,每個訂閱可能返回一個新的執行緒。

New thread

Schedulers.newThread()工廠返回的Scheduler不使用執行緒池。它會為每個Observer增加一個新執行緒,執行結束以後銷燬該執行緒:

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.newThread());

Single

當你想在一個執行緒上順序執行任務,可以呼叫Schedulers.single()。一個單執行緒的實現支援著它。非執行緒安全的操作適合使用它:

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.single());

Trampoline

Schedulers.trampoline()是個有趣的Scheduler。我們很少使用它,它主要用於RxJava的內部實現。它很像立即執行緒上的預設排程器,但是它會防止遞迴排程(在同一個執行緒上,一個任務排程另一個任務)。它不會導致堆疊溢位錯誤,而是完成了當前任務再執行新任務。

ExecutorService

可以使用ExecutorService建立一個Scheduler。比如,我們想增加一個使用20個執行緒的Scheduler:

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Launcher {
    public static void main(String[] args) {
        int numberOfThreads = 20;
        ExecutorService executor =
                Executors.newFixedThreadPool(numberOfThreads);
        Scheduler scheduler = Schedulers.from(executor);
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                .subscribeOn(scheduler)
                .doFinally(executor::shutdown)
                .subscribe(System.out::println);        
    }
}

ExecutorService會讓你的程式一直活著,

Starting and shutting down Schedulers

每個預設Scheduler在你第一次呼叫它的時候才例項化。可以在任何時候使用shutdown()方法處置computation()、io()、newThread()、single()和trampoline() Schedulers,也可以使用Schedulers.shutdown()把他們全部處置掉。呼叫shutdown(),會停掉他們的執行緒,禁止新任務,呼叫其他方法會產生錯誤。也可以呼叫他們的start()方法,或者Schedulersers.start(),重新初始化Schedulers,然後可以重新接受任務。

Understanding subscribeOn()

subscribeOn()建議上游的源Observable,可以使用哪個Scheduler,以及怎樣在其中的執行緒上執行。如果源不打算拴在特定的Scheduler上,它會使用你指定的Scheduler。然後,它使用這個執行緒發射emissions,一直髮給最後的Observer(除非你又增加了observeOn()呼叫)。可以把subscribeOn()放在Observable鏈的任何地方。
看下面的例子,subscribeOn()放在哪兒,效果都是一樣的(為清楚起見,subscribeOn()最好靠近源):

Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.computation()) //preferred
        .map(String::length)
        .filter(i -> i > 5)
        .subscribe(System.out::println);
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .map(String::length)
        .subscribeOn(Schedulers.computation())
        .filter(i -> i > 5)
        .subscribe(System.out::println);
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .map(String::length)
        .filter(i -> i > 5)
        .subscribeOn(Schedulers.computation())
        .subscribe(System.out::println);

同一個Observable有多個Observers,使用subscribeOn()會導致每個都有自己的執行緒(或者如果沒有有效的執行緒的時候,等待一個有效的執行緒)。
你可以呼叫Thread.currentThread().getName()列印執行執行緒的名字:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class Launcher {
    public static void main(String[] args) {
        Observable<Integer> lengths =
                Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
                        .subscribeOn(Schedulers.computation())
                        .map(s -> intenseCalculation((s)))
                        .map(String::length);
        lengths.subscribe(i ->
                System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));
        lengths.subscribe(i ->
                System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));
        sleep(10000);
    }

    public static <T> T intenseCalculation(T value) {
        sleep(ThreadLocalRandom.current().nextInt(3000)