1. 程式人生 > >RxJava 和 RxAndroid 五(執行緒排程)

RxJava 和 RxAndroid 五(執行緒排程)

對rxJava不瞭解的同學可以先看

本文將有幾個例子說明,rxjava執行緒排程的正確使用姿勢。

例1

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: main           -- 主執行緒
/rx_map: main        --  主執行緒
/rx_subscribe: main   -- 主執行緒

例2

   new Thread(new Runnable() {
            @Override
            public void run() {
                Logger.v( "rx_newThread" , Thread.currentThread().getName()  );
                rx();
            }
        }).start();

 void rx(){
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

    }

      結果

/rx_newThread: Thread-564   -- 子執行緒
/rx_call: Thread-564              -- 子執行緒
/rx_map: Thread-564            -- 子執行緒 
/rx_subscribe: Thread-564    -- 子執行緒

  • 通過例1和例2,說明,Rxjava預設執行在當前執行緒中。如果當前執行緒是子執行緒,則rxjava執行在子執行緒;同樣,當前執行緒是主執行緒,則rxjava執行在主執行緒

例3

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: RxCachedThreadScheduler-1    --io執行緒
/rx_map: main                                     --主執行緒
/rx_subscribe: main                              --主執行緒

例4

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ; 

      結果

/rx_call: RxCachedThreadScheduler-1     --io執行緒
/rx_map: RxCachedThreadScheduler-1   --io執行緒
/rx_subscribe: main                              --主執行緒

  • 通過例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出 map() 操作符預設執行在事件產生的執行緒之中。事件消費只是在 subscribe() 裡面。
  • 對於 create() , just() , from()   等                 --- 事件產生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消費

  •   事件產生:預設執行在當前執行緒,可以由 subscribeOn()  自定義執行緒

         事件加工:預設跟事件產生的執行緒保持一致, 可以由 observeOn() 自定義執行緒

       事件消費:預設執行在當前執行緒,可以有observeOn() 自定義

例5  多次切換執行緒

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .observeOn( Schedulers.newThread() )    //新執行緒

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .observeOn( Schedulers.io() )      //io執行緒

                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        Logger.v( "rx_filter" , Thread.currentThread().getName()  );
                        return s != null ;
                    }
                })

                .subscribeOn(Schedulers.io())     //定義事件產生執行緒:io執行緒
                .observeOn(AndroidSchedulers.mainThread())     //事件消費執行緒:主執行緒

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: RxCachedThreadScheduler-1           -- io 執行緒
/rx_map: RxNewThreadScheduler-1             -- new出來的執行緒
/rx_filter: RxCachedThreadScheduler-2        -- io執行緒
/rx_subscribe: main                                   -- 主執行緒

例6:只規定了事件產生的執行緒

       Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;

  結果

/rx--create: RxCachedThreadScheduler-4                      // io 執行緒
/rx--subscribe: RxCachedThreadScheduler-4                 // io 執行緒

例:7:只規定事件消費執行緒

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .observeOn( Schedulers.newThread() )
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;

  結果

/rx--create: main                                           -- 主執行緒
/rx--subscribe: RxNewThreadScheduler-1        --  new 出來的子執行緒 

    從例6可以看出,如果只規定了事件產生的執行緒,那麼事件消費執行緒將跟隨事件產生執行緒。

    從例7可以看出,如果只規定了事件消費的執行緒,那麼事件產生的執行緒和 當前執行緒保持一致。

例8:執行緒排程封裝

 在Android 常常有這樣的場景,後臺處理處理資料,前臺展示資料。

一般的用法:

   Observable
                .just( "123" )
                .subscribeOn( Schedulers.io())
                .observeOn( AndroidSchedulers.mainThread() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  但是專案中這種場景有很多,所以我們就想能不能把這種場景的排程方式封裝起來,方便呼叫。

簡單的封裝

    public Observable apply( Observable observable ){
       return observable.subscribeOn( Schedulers.io() )
                .observeOn( AndroidSchedulers.mainThread() ) ;
    }

使用

  apply( Observable.just( "123" ) )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {

                    }
                }) ;

弊端:雖然上面的這種封裝可以做到執行緒排程的目的,但是它破壞了鏈式程式設計的結構,是程式設計風格變得不優雅。

改進:Transformers 的使用(就是轉化器的意思,把一種型別的Observable轉換成另一種型別的Observable )

改進後的封裝

    Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

  使用

      Observable
                .just( "123" )
                .compose( schedulersTransformer )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  弊端:雖然保持了鏈式程式設計結構的完整,但是每次呼叫 .compose( schedulersTransformer ) 都是 new 了一個物件的。所以我們需要再次封裝,儘量保證單例的模式。

改進後的封裝

package lib.app.com.myapplication;

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;

/**
 * Created by ${zyj} on 2016/7/1.
 */
public class RxUtil {

    private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

   public static  <T> Observable.Transformer<T, T> applySchedulers() {
        return (Observable.Transformer<T, T>) schedulersTransformer;
    }

}

  使用

    Observable
                .just( "123" )
                .compose( RxUtil.<String>applySchedulers() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

相關推薦

RxJava RxAndroid 執行排程

對rxJava不瞭解的同學可以先看 本文將有幾個例子說明,rxjava執行緒排程的正確使用姿勢。 例1 Observable .create(new Observable.OnSubscribe<String>() {

RxJava RxAndroidRxBinding的使用

對Rxjava不熟悉的同學可以先看我之前寫的幾篇文章 前言:RxBinding 是 Jake Wharton 的一個開源庫,它提供了一套在 Android 平臺上的基於 RxJava的 Binding API。所謂 Binding,就是類似設定 OnClickListener 、設定

RxJava RxAndroid操作符的使用

前言:對Rx不瞭解的朋友可以先看我的第一篇博文 ,是對Rxjava的基本介紹 1、merge操作符,合併觀察物件 19 List<String> list1 = new ArrayList<>() ; 20 List<String&g

javaSE (三十執行 執行實現方法區別、同步程式碼塊方法執行安全

主要還是熟悉api,熟悉方法,簡單,需要多實踐 1、 多執行緒實現方法和區別: 多執行緒實現的兩種方法: 1)類繼承Thread類或實現Runnable介面,重寫run()方法 2)建立Thread的子類物件(需要開幾個執行緒就建立幾個物件,可建立匿名內部類) 3)子類

RxJava RxAndroid生命週期控制記憶體優化

rxjava rxandroid 趙彥軍 RxJava使我們很方便的使用鏈式程式設計,程式碼看起來既簡潔又優雅。但是RxJava使用起來也是有副作用的,使用越來越多的訂閱,記憶體開銷也會變得很大

python多執行3---生產者與消費者執行通訊Queue模組

Queue模組可以進行執行緒通訊。比如生產者產生貨物,貨物放入佇列,通過通訊,消費者取得貨物。Queue被稱為通用佇列模組 queue(size)產生一個佇列,佇列模式有3種,針對這三種佇列分別有三個建構函式: 1 FIFO佇列先進先出:class Queu

問題解決 ROS節點的不可執行執行退出

在本例項中存在兩個ROS節點 odometry和sensor 遇到的問題:rosrun執行兩個節點時發現沒有反應                      改用launch指令碼執行,發現如下結果-》odometry和sensor執行緒退出

Android--menuOkHttp框架未封裝,結合Executors執行實現網路請求的案例

涉及到的 知識點: 1.安卓UI元件menu 2.OkHttp框架 3.Executors(執行緒池) OkHttp結構簡介 案例程式碼 import android.os.Bundle; import android.suppo

任務,執行同步之ThreadPool執行

執行緒池 任務後臺基礎:執行緒池。建立執行緒需要時間。如果有不同的短任務要完成,就可以事先建立許多執行緒,在應完成這些任務時發出請求,這個執行緒最好在需要更多的執行緒時增加,在需要釋放資源時減少。 不需要自己建立這樣的一個列表,該表由ThreadPool類託

執行八大基礎核心執行相關方法一

1.引子 在java多執行緒併發程式設計中,有八大基礎核心。考考你:看看都有哪八大基礎核心呢?它們分別是: 1.建立執行緒的方式 2.執行緒啟動 3.執行緒停止 4.執行緒生命週期 5.執行緒相關的方法 6.執行緒相關的屬性 7.執行緒異常處理 8.執行緒安全 今天我們從第五個基礎核心開始:執行緒相關方法 2

執行執行

執行緒池:顧名思義,充滿執行緒的池子,通過呼叫Executors類方法來實現建立執行緒池以及進行任務的任務。 例一:通過Executors類靜態方法ExecutorService newFixedThreadPool(int nThread)(混合執行緒)來建立執行緒池。再呼叫execute方法

執行執行同步問題示例

例一:執行緒同步問題示例(試衣)                       

執行interrupt、setPriority、join方法示例

例一:interrupt方法                       &

執行執行概念及建立執行

概念定義:執行緒是程序中的一個任務,也叫順序執行流,同時執行一個程序中的多個任務(也就是執行多個順序執行流)就是多執行緒     程式:是作業系統中實現功能的程式碼塊,也叫軟體     程序:正在執行中的程式     多程序:多個程式同

《Java多執行程式設計實戰》—— 第9章 Thread Pool執行模式

一個系統中的執行緒相對於其所要處理的任務而言,是一種非常有限的資源。執行緒不僅在執行任務時需要消耗CPU時間和記憶體等資源,執行緒物件(Thread例項)本身以及執行緒所需的呼叫棧(Call Stack)也佔用記憶體,並且Java中建立一個執行緒往往意味著JVM會建立相應的依賴於宿主機作業系

SimpleDateFormatPool日期格式化類池工具執行安全

轉載請註明出處: https://blog.csdn.net/jevonsCSDN/article/details/83092795 【Jevons’Blog】 SimpleDateFormat是一個非執行緒安全類,當高併發時,若共用一個format物件,則

Go語言sync.Pool執行使用

前言 Go 1.3 的sync包中加入一個新特性:Pool。 這個類設計的目的是用來儲存和複用臨時物件,以減少記憶體分配,降低CG壓力。 1 2 3 4 type Pool func (p *Pool) G

設計模式之單例模式執行安全

可以說單例模式是所有設計模式中最簡單的一種。 單例模式就是說系統中對於某類的只能有一個物件,不可能出來第二個 單例模式也是23種設計模式中在面試時少數幾個會要求寫程式碼的模式之一。主要考察的是多執行緒下面單例模式的執行緒安全性問題。 1.多執行緒安全單例模式例項

ZeroMQ介面函式之 :zmq_inproc – ØMQ 本地程序內執行傳輸方式

————————————————————————————————————— zmq_inproc(7)   ØMQ Manual - ØMQ/4.2.0 Name zmq_inproc – ØMQ 本地程序內(執行緒間)傳輸方式 Synopsis 程序內傳輸方式意味著在共享ZMQ con

Android通過AsyncTask與ThreadPool執行兩種方式非同步載入大量資料的分析與對比

如果您認為本部落格不錯,讀後覺得有收穫,不妨打賞讚助我一下,讓我有動力繼續寫出高質量的部落格。 贈人玫瑰,手有餘香。分享技術,傳遞快樂。 有心課堂,傳遞的不僅僅是技術! QQ交流群:250468947 有心課堂會員,請加入VIP QQ交流