1. 程式人生 > >rxJava的使用--Observable的建立及原始碼分析(一)

rxJava的使用--Observable的建立及原始碼分析(一)

       最近新開的專案需要用到rxJava.在網上找了一下資料,感覺資料好少,有一些資料雖然有例子,但例子都好複雜,對一個新手來說操作是挺麻煩的.因此,本人根據自己的理解寫了一些例子還有原始碼分析,給記憶力不好的自己,留作複習用,也希望能幫助大家.文章可能寫得有點慢,請大家見諒.
       本文的順序是根據ReactiveX文件中文翻譯來寫的,該文件已經把一些概念性的東西講得很清楚了,在這裡我就不再講述.如果有需要的話大家可以去看一下.

Observable的建立

1, create,程式碼如下:

  Observable.create(new Observable.OnSubscribe<String>() {
            @Override
public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello,rxJava"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("---- onCompleted ----"
); } @Override public void onError(Throwable e) { System.out.println("---- onError ----"); } @Override public void onNext(String s) { System.out.println("---- onNext ----:" + s); } });

執行結果如下:

---- onNext ----:hello,rxJava
---- onCompleted ----

通過該方建立Observable的話需要手動的去呼叫subscriber裡面的方法如:onNext,onCompleted.onError一般是程式出現錯誤的時候,rxjava主動呼叫的,因此,正常情況下,不需要去呼叫.
create方法做了什麼呢,下面我們去看一下原始碼,首先看到create方法的內部
原始碼分析:

public static <T> Observable<T> create(OnSubscribe<T> f) {
      return new Observable<T>(hook.onCreate(f));
  }

看到hook.onCreate(f)的onCreate內部

public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;
    }

直接返回了我們之前建立的OnSubscribe的例項物件.接著建立Observable物件並返回.
下面看subscribe方法的內部:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

這裡的this就是之前返回的Observable物件,繼續看subscribe方法的內部:

 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        ... 省略部分
        // subscriber每次都會首先呼叫onStart方法
        subscriber.onStart(); 
        // 這裡把subscriber封裝成一個安全的物件
        // 在這裡就不分析SafeSubscriber裡面的程式碼了
        // 因為裡面的程式碼無非是通過try{}catch{}來保證整個流程正常執行
        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
        try {
            // onSubscribeStart返回之前建立的observable.onSubscribe物件,並且呼叫call
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            ... 省略部分
            // 合理就是如丟擲異常之後的處理了
             try {
                  subscriber.onError(hook.onSubscribeError(e));
             }catch (Throwable e2) {
                ... 省略部分
            }
            // 從這也可以知道,如果有異常丟擲的話,rxjava會為我們取消訂閱
            return Subscriptions.unsubscribed();
        }
  }

這裡需要注意的是subscriber = new SafeSubscriber(subscriber);這一句程式碼,我就簡單的說一下這樣做的原因:主要是為了防止使用者建立的subscriber丟擲異常導致整個流程不能正常執行.具體理由大家可以點選該連結,這裡有詳細的討論.

2, Defer,程式碼如下:

    Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable call() {
                // 這裡要返回一個Observable的例項物件,在這裡用create的方法建立
                return Observable.create(new Observable.OnSubscribe<String>() {
                    @Override
                    // 這裡還是create的用法
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                });
            }
            // 然後這裡是訂閱者,跟create一樣
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("---- onCompleted ----");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("---- onError ----");
            }
            @Override
            public void onNext(String o) {
                System.out.println("---- onNext ----" + o);
            }
        });
---- onNext ----hello world
---- onCompleted ----

該方法從用法還是結果來看基本和create一樣,並且還多了new Func0 <Observable>(){}這麼一個步驟. 看起來比create還麻煩.它內部的實現與create有什麼異同呢.下面我們看一下defer這個方法的內部實現.
原始碼分析:

    public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
        return create(new OnSubscribeDefer<T>(observableFactory));
    }

看到這個方法之前我們new Func0物件變成了一個observable的建立工廠observableFactory.接著看OnSubscribeDefer這個類的內部,知道這個OnSubscribeDefer,實現的是OnSubscribe介面:

public final class OnSubscribeDefer<T> implements OnSubscribe<T>

該類把我們之前建立的Func0物件儲存起來

 public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
        this.observableFactory = observableFactory;
   }

這樣也可以看出defer方法返回的是OnSubscribeDefer的物件.因此在subscribe呼叫的是該例項的call方法,因此去看該類的的call方法:

   @Override
    public void call(final Subscriber<? super T> s) {
        Observable<? extends T> o;
        try {
            // 這裡呼叫了 Func0物件的call方法,返回了我們用過create建立的
            // Observable例項
            o = observableFactory.call();
        } catch (Throwable t) {
            Exceptions.throwOrReport(t, s);
            return;
        }
        // 然後再這裡呼叫create裡面的方法call
        // 這裡的unsafeSubscribe從原始碼可知
        // 沒有把Subscriber物件包裝為SafeSubscriber
        o.unsafeSubscribe(Subscribers.wrap(s));
    }

繼續去看unsafeSubscribe的原始碼:

    public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
             ... 忽略部分
            subscriber.onStart();
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
               ... 忽略部分
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
              ... 忽略部分
            }
            return Subscriptions.unsubscribed();
        }
    }

該方法跟我們之前分析的subscribe基本是一樣的,除了沒有把subscriber物件包裝成SafeSubscriber物件之外.
defer的整個流程可以總結為:呼叫defer的時候並沒有直接建立Observable的例項物件,而是在呼叫subscribe時候才通過observableFactory來建立.至於它的用處,等我們講到just的時候才回過頭來看.

3, empty,程式碼如下:

     Observable.empty().subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.println("---- onCompleted ----");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("---- onError ----");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("---- onNext ----");
            }
        });

結果如下:

---- onCompleted ----

從執行結果看,通empty建立的Observable值呼叫onCompleted方法,不呼叫onNext:
原始碼分析:
empty方法內部:

public static <T> Observable<T> empty() {
        return EmptyObservableHolder.instance();
    }

跟進EmptyObservableHolder.instance(),發現EmptyObservableHolder是一個列舉類:

public enum EmptyObservableHolder implements OnSubscribe<Object> {
    INSTANCE
    ...忽略部分
   }

而instance方法內容如下:

  public static <T> Observable<T> instance() {
        return (Observable<T>)EMPTY;
    }

該方法返回了列舉類裡面的INSTANCE.
因此subscribe方法裡面呼叫的是該類的call方法了,看到該列舉類裡面的call方法:

 @Override
    public void call(Subscriber<? super Object> child) {
        child.onCompleted();
    }

從這裡可以知道為什麼empty建立的Observable物件只調用了onCompleted方法的原因了;

4, Never,程式碼如下:

 Observable.never().subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.println("---- onCompleted ----");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("---- onError ----");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("---- onNext ----");
            }
        });

執行該程式碼片段時,什麼輸出都沒有.
原始碼分析如下:
跟進never方法看到:

  public static <T> Observable<T> never() {
        return NeverObservableHolder.instance();
    }

根據之前empty分析可以判斷出NeverObservableHolder也是一個列舉類,該類裡面的call方法程式碼如下:

 @Override
    public void call(Subscriber<? super Object> child) {
    }

從這裡看到該call方法執行內容為空,這就是never什麼都不執行的原因了.

5, error,程式碼如下:

   Observable.error(new Exception("hello I'm error")).subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                System.out.println("---- onCompleted ----");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("---- onError ----" + e.getMessage());
            }

            @Override
            public void onNext(Object o) {
                System.out.println("---- onNext ----");
            }
        });

執行結果如下:

---- onError ----hello I'm error

這裡的異常資訊就是通過error建立Observable時傳入異常的資訊.
原始碼分析如下:
error方法內部:

   public static <T> Observable<T> error(Throwable exception) {
        return create(new OnSubscribeThrow<T>(exception));
    }

跟進OnSubscribeThrow,看到該類的call方法

 @Override
    public void call(Subscriber<? super T> observer) {
        observer.onError(exception);
    }

可以看到該類的call方法直接呼叫onError方法並且傳入我們之前的建立的Exception.

好了第一部分就先分析到這裡了,如果大家有什麼不明白的,或者認為我講得不合理的都可以給我留言.