RxJava入門
前言
- 什麼是RxJava? 簡單來說,RxJava是基於觀察者模式,提供便捷的非同步操作的一套API。
- RxJava好在哪?它提供了一系列豐富的操作符,支援鏈式呼叫,可以便捷的進行執行緒的切換。
- 本文基於RxJava 2.2.2,是自己學習過程中的筆記,方便以後查閱使用
簡介
RxJava最基本的兩個元素:
1 Observable(被觀察者)
2 Observer(觀察者)
我們通過subscribe(訂閱)便可使它們形成訂閱關係。下面就看一下它們最基本的實現:
建立一個Observable:
Observable observable=Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { } });
建立一個Observer:
Observer observer =new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { } @Override public void onNext(String s) { } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } };
訂閱:
observable.subscribe(observer);
鏈式呼叫:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { } @Override public void onNext(String s) { } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } });
以上,就可以簡單的使用RxJava了。。。what? 辣雞,寫了點啥?
1 observable.subscribe(observer); 這個方法幹了啥?
observable.subscribe(observer); //這個方法幹了啥?
2ObservableOnSubscribe. subscribe(ObservableEmitter<String> observableEmitter)這個方法啥時候呼叫,observableEmitter怎麼來的?,幹什麼的?
new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { //這個方法啥時候呼叫,observableEmitter怎麼來的?,有什麼用? } }
首先,我們從頭開始看,Observable是個抽象類,其靜態方法,Observable.create(ObservableOnSubscribe<T> source ):
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null");//對傳進來的source引數判空,如果是null,則丟擲異常。 return RxJavaPlugins.onAssembly(new ObservableCreate(source)); }
看看onAssembly這個方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; return f != null ? (Observable)apply(f, source) : source;//f 初始值是null的,也就是說這個方法返回值便是傳進來的source引數,至此Observable建立完成 }
至此,Observable建立流程便清楚了,大體就是通過呼叫 Observable.create(ObservableOnSubscribe<T> source )方法。new一個ObservableCreate物件並返回,接下來再看看observable.subscribe(observer) 這個方法
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null");//判空 try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); this.subscribeActual(observer);//通過上面我們知道這個實際上呼叫了ObservableCreate.subscribeActual(observer) } catch (NullPointerException var4) { throw var4; } catch (Throwable var5) { Exceptions.throwIfFatal(var5); RxJavaPlugins.onError(var5); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(var5); throw npe; } }
通過上面可以看出,實際上呼叫了ObservableCreate.subscribeActual(observer),再看看 ObservableCreate這個類
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source;//就是create中的ObservableOnSubscribe物件 } /** *實現了subscribeActual這個抽象方法 */ protected void subscribeActual(Observer<? super T> observer) { ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer); observer.onSubscribe(parent);//observer便是我們建立的observer物件 try { this.source.subscribe(parent);//也就是create中的ObservableOnSubscribe物件的suscribe方法 } catch (Throwable var4) { Exceptions.throwIfFatal(var4); parent.onError(var4); } } ... } static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer;//我們建立的Observer } public void onNext(T t) { if (t == null) { this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); } else { if (!this.isDisposed()) { this.observer.onNext(t); } } } public void onError(Throwable t) { if (!this.tryOnError(t)) { RxJavaPlugins.onError(t); } } public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!this.isDisposed()) { try { this.observer.onError((Throwable)t); } finally { this.dispose(); } return true; } else { return false; } } public void onComplete() { if (!this.isDisposed()) { try { this.observer.onComplete(); } finally { this.dispose(); } } } public void setDisposable(Disposable d) { DisposableHelper.set(this, d); } public void setCancellable(Cancellable c) { this.setDisposable(new CancellableDisposable(c)); } public ObservableEmitter<T> serialize() { return new ObservableCreate.SerializedEmitter(this); } public void dispose() { DisposableHelper.dispose(this); } public boolean isDisposed() { return DisposableHelper.isDisposed((Disposable)this.get()); } public String toString() { return String.format("%s{%s}", this.getClass().getSimpleName(), super.toString()); } } }
以上,便可清楚,當呼叫observable.subscribe(observer) ,observer的onSubscribe(ObservableEmitter<T> observableEmitter)方法便會呼叫,同時create中的ObservableOnSubscribe物件的suscribe方法也會呼叫
總結
Observable observable=Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { //在observable.subscribe(observer);執行後執行, //observableEmitter裡面的Observer便是我們建立的Observer,其onNext,onError, // onComplete均會交給我們建立的Observer執行 } }); Observer observer =new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { //在observable.subscribe(observer);執行後執行 } @Override public void onNext(String s) { } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } }; observable.subscribe(observer);
observable.subscribe(observer);訂閱方法執行後,observer的onSubscribe(Disposable disposable)方法會呼叫,ObservableOnSubscribe的subscribe(ObservableEmitter<String> observableEmitter)方法會呼叫,其中observableEmitter便是由我們建立的observer包裝而成的,其傳送的事件會被我們建立的Observer收到(disposable.dispose()之後的收不到)。以上便是個人學習RxJava的一點理解。