1. 程式人生 > >RxJava執行緒切換原理

RxJava執行緒切換原理

RxJava在圈子裡越來越火,相信很大的一個原因就是它的執行緒切換。它的執行緒切換可以用優雅來形容,鏈式呼叫,簡單、方便。今天,就讓我們來窺探一下RxJava的執行緒切換原理。本次拆輪子,還是按原樣,通過小例子,研讀RxJava原始碼等來理解整個過程、結構、原理,我們首要的是先對執行緒切換的原理有個全域性的概覽,細節再慢慢來深入。

1 前言

執行緒的切換都是通過subscribeOn或者observeOn來進行,生產者的執行執行緒只受subscribeOn控制,不受observeOn影響。subscribeOn指定的執行緒環境能一直維持到第一次observeOn出現之前。要講執行緒切換原理之前,我們先來看一下下面的幾個類定義:

  • Operator
  1. /**
  2. * Operator function for lifting into an Observable.
  3. */
  4. publicinterfaceOperator<R, T>extendsFunc1<Subscriber<?super R>,Subscriber<?super T>>{
  5. // cover for generics insanity
  6. }

Operator是Observable中定義的介面,即使用者的邏輯操作,RxJava框架會呼叫lift方法將Operator包裝成為Observable。

  • ObserveOnSubseriber

ObserveOnSubseriber是被訂閱者的類,處理使用者資料邏輯,也即生產者,用來產生使用者資料的。

  • OperatorObserveOn

OperatorObserveOn是訂閱者的類,接收資料的,也即消費者,消費生產者傳送過來的資料。

  • Worker

Worker是執行緒真正執行的地方,也就是單獨新建的一個執行緒或執行緒池中的某個執行緒。

2 原理解析

我們先建立一個Observable:

  1. Observable.just(null)
  2. .subscribeOn(Schedulers.io())
  3. .observeOn(AndroidSchedulers.mainThread())
  4. .subscribe
    ();

just方法又是呼叫的ScalarSynchronousObservable,然後new一個OnSubscribe作為建構函式的引數,暫且叫做1號OnSubscribe,這個下面會再提到,也是執行緒切換的區別所在:

  1. protectedScalarSynchronousObservable(final T t){
  2. super(newOnSubscribe<T>(){
  3. @Override
  4. publicvoid call(Subscriber<?super T> s){
  5. s.onNext(t);
  6. s.onCompleted();
  7. }
  8. });
  9. this.t = t;
  10. }

執行緒切換的要點在lift()函式裡面,都是基於同一個基礎的變換方法: lift(Operator)

先來看一下它的原始碼:

  1. publicfinal<R>Observable<R> lift(finalOperator<?extends R,?super T>operator){
  2. returnnewObservable<R>(newOnSubscribe<R>(){
  3. @Override
  4. publicvoid call(Subscriber<?super R> o){
  5. try{
  6. Subscriber<?super T> st = hook.onLift(operator).call(o);
  7. try{
  8. // new Subscriber created and being subscribed with so 'onStart' it
  9. st.onStart();
  10. onSubscribe.call(st);
  11. }catch(Throwable e){
  12. // localized capture of errors rather than it skipping all operators
  13. // and ending up in the try/catch of the subscribe method which then
  14. // prevents onErrorResumeNext and other similar approaches to error handling
  15. if(e instanceofOnErrorNotImplementedException){
  16. throw(OnErrorNotImplementedException) e;
  17. }
  18. st.onError(e);
  19. }
  20. }catch(Throwable e){
  21. if(e instanceofOnErrorNotImplementedException){
  22. throw(OnErrorNotImplementedException) e;
  23. }
  24. // if the lift function failed all we can do is pass the error to the final Subscriber
  25. // as we don't have the operator available to us
  26. o.onError(e);
  27. }
  28. }
  29. });
  30. }

我們可以看到,呼叫lift()方法後(即執行subscribeOn或observeOn),是返回一個新的Observable,而不是呼叫者的Observable,這裡同樣是重新建立了一個OnSubscribe,暫且叫做2號OnSubscribe,我們再回頭看看,這個OnSubscribe與前面提到的just()方法裡面呼叫到的OnSubscribe不是同一個物件

這裡是執行緒切換的關鍵點。當呼叫鏈來到lift()方法後,使用的是lift()所返回的新的 Observable,也就是它所觸發的onSubscribe.call(subscriber)也是用新的Observable中的新 OnSubscribe,即我們上面命名的2號OnSubscribe。

3 OperatorSubscribeOn

再來看lift()函式的原始碼,它的第二個try方法體裡面有個onSubscribe,這個OnSubscribe就是我們前面定義的1號onSubscribe,它就是我們呼叫just()方法後建立的原始Observable。

那它是怎麼做到切換執行緒的呢?如上面的例子,subscribeOn(Schedulers.io()),它通過下面的程式碼(舉例)產生一個新的Subscriber:

  1. Subscriber<?super T> st = hook.onLift(operator).call(o);//將新的Subscriber物件o傳遞給OperatorSubscribeOn,它裡面的call()方法去建立新的Worker執行緒
  2. //OperatorSubscribeOn的call(o)方法
  3. @Override
  4. publicSubscriber<?superObservable<T>> call(finalSubscriber<?super T> subscriber){
  5. finalWorker inner = scheduler.createWorker();//新建執行緒
  6. subscriber.add(inner);
  7. returnnewSubscriber<Observable<T>>(subscriber){
  8. @Override
  9. publicvoid onCompleted(){
  10. // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
  11. }
  12. @Override
  13. publicvoid onError(Throwable e){
  14. subscriber.onError(e);
  15. }
  16. @Override
  17. publicvoid onNext(finalObservable<T> o){
  18. inner.schedule(newAction0(){
  19. @Override
  20. publicvoid call(){
  21. finalThread t =Thread.currentThread();
  22. o.unsafeSubscribe(newSubscriber<T>(subscriber){
  23. @Override
  24. publicvoid onCompleted(){
  25. subscriber.onCompleted();
  26. }
  27. @Override
  28. publicvoid onError(Throwable e){
  29. subscriber.onError(e);
  30. }
  31. @Override
  32. publicvoid onNext(T t){
  33. subscriber.onNext(t);
  34. }
  35. @Override
  36. publicvoid setProducer(finalProducer producer){
  37. subscriber.setProducer(newProducer(){
  38. @Override
  39. publicvoid request(finallong n){
  40. if(Thread.currentThread()== t){//如果是當前執行緒,則在當前執行緒執行
  41. // don't schedule if we're already on the thread (primarily for first setProducer call)
  42. // see unit test 'testSetProducerSynchronousRequest' for more context on this
  43. producer.request(n);
  44. }else{//不是當前執行緒,將在新建立的Worker執行緒inner中執行
  45. inner.schedule(newAction0(){
  46. @Override
  47. publicvoid call(){
  48. producer.request(n);
  49. }
  50. });
  51. }
  52. }
  53. });
  54. }
  55. });
  56. }
  57. });
  58. }
  59. };
  60. }

然後,通過呼叫1號OnSubscribe的call()方法 onSubscribe.call(st) 將新建立的Subscriber與原始的Observable關聯起來,即新的Subscriber去訂閱原始的Observable。這樣,生產者

通過上面的程式碼可以知道,Scheduler類其實並不負責非同步執行緒處理,它只負責通過createWorker()類創建出一個Worker物件,真正負責任務的延時處理。

4 OperatorObserveOn

observeOn方法內部也是呼叫了lift()方法,然後建立一個operator,

  1. //OperatorObserveOn.java
  2. @Override
  3. publicSubscriber<?super T> call(Subscriber<?super T> child){
  4. if(scheduler instanceofImmediateScheduler){
  5. // avoid overhead, execute directly
  6. return child;
  7. }elseif(scheduler instanceofTrampolineScheduler){
  8. // avoid overhead, execute directly
  9. return child;
  10. }else{
  11. ObserveOnSubscriber<T> parent =newObserveOnSubscriber<T>(scheduler, child);
  12. parent.init();
  13. return parent;
  14. }
  15. }
  16. publicObserveOnSubscriber(Scheduler scheduler,Subscriber<?super T> child){
  17. this.child = child;
  18. this.recursiveScheduler = scheduler.createWorker();//建立新的worker執行緒
  19. if(UnsafeAccess.isUnsafeAvailable()){
  20. queue =newSpscArrayQueue<Object>(RxRingBuffer.SIZE);
  21. }else{
  22. queue =newSynchronizedQueue<Object>(RxRingBuffer.SIZE);
  23. }
  24. this.scheduledUnsubscribe =newScheduledUnsubscribe(recursiveScheduler);
  25. }
  26. protectedvoid schedule(){
  27. if(COUNTER_UPDATER.getAndIncrement(this)==0){
  28. recursiveScheduler.schedule(action);//用相應的執行緒進行資料輸出排程
  29. }
  30. }

結合扔物線大大的圖如下:


未完待續......

5 參考