1. 程式人生 > >RxJava裡doOnNext的使用和執行緒處理

RxJava裡doOnNext的使用和執行緒處理

doOnNext的使用

我對doOnNext的使用是存在疑惑的,按照官方文件

The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

更多的文件裡說明do系列的作用是side effect,當onNext發生時,它被呼叫,不改變資料流。我做的測試程式碼:

2017.9.28增加修正:
首先要理解什麼是副作用:方法執行的時候,產生了外部可以觀察到的變化就是產生了副作用。
那些文件的意思是說,不要在doOnNext做能發生副作用的方法,也就是不要去改變資料流

Observable.create(new Observable.OnSubscribe<Person>() {
            @Override
            public void call(Subscriber<? super Person> subscriber) {
                Person person = new Person(201);
                subscriber.onNext(person);
            }
        }).doOnNext(new Action1<Person>() {
            @Override
public void call(Person person) { person.age = 301; } }).subscribe(new Action1<Person>() { @Override public void call(Person person) { Log.d(TAG, "call: " + person.age);//輸出301 } });

可見,doOnNext是改變了流裡的資料的,所以並不明白不改變資料流是什麼意思。
從github上

很多專案這篇文章來看,do系列的作用

  • 使用doOnNext()來除錯
  • 在flatMap()裡使用doOnError()作為錯誤處理。
  • 使用doOnNext()去儲存/快取網路結果

    按照我的測試

   final SimpleDateFormat sDateFormat    =   new   SimpleDateFormat("yyyy-MM-dd    hh:mm:ss");
        Observable.create(new Observable.OnSubscribe<Person>() {
            @Override
            public void call(Subscriber<? super Person> subscriber) {
                String    date    =    sDateFormat.format(new    Date());
                System.out.println(date + " call " + Thread.currentThread().getName());
                Person person = new Person(201);
                subscriber.onNext(person);
            }
        }).subscribeOn(Schedulers.io()) //指定耗時程序
                .observeOn(Schedulers.newThread()) //指定doOnNext執行執行緒是新執行緒
                .doOnNext(new Action1<Person>() {
            @Override
            public void call(Person person) {
                String    date    =    sDateFormat.format(new    Date());
                System.out.println(date + " call " + Thread.currentThread().getName());
                person.age = 301;
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).observeOn(AndroidSchedulers.mainThread())//指定最後觀察者在主執行緒
                .subscribe(new Action1<Person>() {
            @Override
            public void call(Person person) {
                String    date    =    sDateFormat.format(new    Date());
                System.out.println(date + " call " + Thread.currentThread().getName());
                Log.d(TAG, "call: " + person.age);
            }
        });

執行結果

03-01 14:49:29.897 23442-24145/com.example.myrxlearn I/System.out: 2016-03-01    02:49:29 call RxCachedThreadScheduler-2
03-01 14:49:29.907 23442-24144/com.example.myrxlearn I/System.out: 2016-03-01    02:49:29 call RxNewThreadScheduler-2
03-01 14:49:31.907 23442-23442/com.example.myrxlearn I/System.out: 2016-03-01    02:49:31 call main

也就是說直到doOnNext裡的方法在新執行緒執行完畢,subscribe裡的call才有機會在主執行緒執行。

一直沒看到有合適的方法解決這個問題,因為快取的時間不應該去阻礙主執行緒裡資料的顯示。

今天做回顧時看到了這篇文章

非阻塞I/O操作

現在我們可以使用Schedulers.io()建立非阻塞的版本:

public static void storeBitmap(Context context, Bitmap bitmap,
String filename) {
Schedulers.io().createWorker().schedule(() -> {
blockingStoreBitmap(context, bitmap, filename);
}); }

然後想起來一直存在的這個問題。只需要把上面的程式碼改成

.doOnNext(new Action1<Person>() {
            @Override
            public void call(Person person) {
                String    date    =    sDateFormat.format(new    Date());
                System.out.println(date + " call " + Thread.currentThread().getName());
                person.age = 301;
                Schedulers.io().createWorker().schedule(new Action0() {
                    @Override
                    public void call() {
                        try {
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        })

不需要在用observeOn指定在新執行緒就可以實現

03-01 14:55:02.307 30368-30406/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call RxCachedThreadScheduler-1
03-01 14:55:02.307 30368-30406/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call RxCachedThreadScheduler-1
03-01 14:55:02.347 30368-30368/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call main

這樣doOnNext可以方便的用來除錯,用來快取。
奈何,我還是沒能明白副作用,期待指點。

doOnNext is for side-effects: you want to react (eg. log) to item
emissions in an intermediate step of your stream, for example before
the stream is filtered, for transverse behavior like logging, but you
still want the value to propagate down the stream.

onNext is more final, it consumes the value.