1. 程式人生 > >RxJava RxAndroid RxLifecycle基本使用(使用Kotlin語言)

RxJava RxAndroid RxLifecycle基本使用(使用Kotlin語言)

簡介

RxJava是基於響應式程式設計的框架,響應式程式設計的思想就是一個事件發生後,監聽著這個事件的監聽器馬上做出響應。類似於平常的開關燈。當我們開啟燈的開關時,燈馬上亮了;當我們關閉燈的開關時,燈馬上熄了。這個過程中,燈對我們控制開關的事件做出了響應。在Android中,設定按鈕監聽器也用到了響應式的思想,當有點選事件發生時,OnClickListener馬上執行。也就是說OnClickListener時刻觀察著按鈕,當按鈕被點選時,OnClickListener馬上做出了響應。

RxJava中的三個基本概念:觀察者Observer,被觀察者Observable,訂閱subscribe。當觀察者訂閱了被觀察者,被觀察者有事件發生時,觀察者可以做出響應。

RxAndroid是JakeWharton對RxJava做的一個擴充套件,主要是為了讓Android中更好的使用RxJava

RxJava的Github地址為:https://github.com/ReactiveX/RxJava

RxAndroid的Github地址為:https://github.com/ReactiveX/RxAndroid

使用

1.在app模組的build.gradle的dependencies中引入RxJava和RxAndroid:

implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.2'

2.在MainActivity中建立被觀察者和觀察者,然後建立訂閱,例:(本例中佈局非常簡單,只有一個id為text的TextView,故不再給出佈局程式碼)

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //建立一個被觀察者,發出整型資料
val observable = createObservable() //建立一個觀察者,接收整型資料 val observer = createObserver() //建立訂閱 observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer) } private fun createObservable(): Observable<Int> { return Observable.create { for (i in 0..9) { //通知觀察者執行onNext()方法 it.onNext(i) } //通知觀察者資料傳送完成 it.onComplete() } } private fun createObserver(): Observer<Int> { return object : Observer<Int> { override fun onSubscribe(d: Disposable) { text.append("subscribe\n") } override fun onNext(integer: Int) { text.append("$integer\n") } override fun onComplete() { text.append("complete.") } override fun onError(e: Throwable) { text.append(e.message) } } } }

注:

(1)被觀察者需要重寫subscribe()方法,在此方法中使用emitter發出資料,Kotlin對於單方法單引數的物件有一個語法糖,使用it即可表示這個單引數emitter

(2)觀察者需要重寫onSubscribe()、onNext()、onError()、onComplete()方法。

onSubscribe():訂閱開始時呼叫

onNext():執行發射器發出的事件

onError():當程式出錯時,執行onError,訂閱結束

onComplete():當程式完成後,執行onComplete,訂閱結束

(3)使用observable.subscribe(observer)建立訂閱。subscribeOn(Scheduler scheduler)決定被觀察者發射事件的執行緒,observeOn(Scheduler scheduler)決定觀察者接收事件的執行緒。

常用的執行緒型別有:

Schedulers.newThread():新執行緒

Schedulers.io():IO執行緒

Schedulers.computation():計算執行緒

AndroidSchedulers.mainThread():主執行緒(RxAndroid庫中的)

執行程式,可以看到如下結果:
訂閱示例

可以看到observable中發出的事件被observer依次執行了

map操作符

3.使用map在資料傳遞過程中進行資料型別轉換

例:在MainActivity中建立發出整型資料的被觀察者、接收字串資料的觀察者,使用map將整型資料轉換成字串,建立訂閱

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //建立一個被觀察者,發出整型資料
        val observable = createObservable()
        //建立一個觀察者,接收字串資料
        val observer = createObserver()
        //建立訂閱
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .map{
                    "string $it"
                }
                .subscribe(observer)
    }

    private fun createObservable(): Observable<Int> {
        return Observable.create {
            for (i in 0..9) {
                //通知觀察者執行onNext()方法
                it.onNext(i)
            }
            //通知觀察者資料傳送完成
            it.onComplete()
        }
    }

    private fun createObserver(): Observer<String> {
        return object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                text.append("subscribe\n")
            }

            override fun onNext(string: String) {
                text.append("$string\n")
            }

            override fun onComplete() {
                text.append("complete.")
            }

            override fun onError(e: Throwable) {
                text.append(e.message)
            }
        }
    }
}

注:map()函式中需要傳入一個Function<T1,T2>進行型別轉換,Function中需要重寫apply函式,這裡同樣使用了kotlin的語法糖,it表示傳進來的整數,map函式最後一行作為返回值

T2 apply(T1 data):將T1型別的資料傳入,返回T2型別資料,實現資料型別轉換

執行程式,可以看到如下結果:
map之後

flatMap操作符和concatMap操作符

4.使用flatMap將發射的一列資料列展開,單獨發射
例:使用flatMap的程式碼如下:

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //建立一個被觀察者,發出整型資料
        val observable = createObservable()
        //建立一個觀察者,接收字串資料
        val observer = createObserver()
        //建立訂閱
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap { origin ->
                    Observable.create<String> {
                        //將原始的一列資料展開,單獨用一個被觀察者發射出去
                        it.onNext("new emitter:$origin")
                        //當展開的所有被觀察者都完成後,原始被觀察者才會完成
                        it.onComplete()
                    }
                }
                .subscribe(observer)
    }

    private fun createObservable(): Observable<Int> {
        return Observable.create {
            for (i in 0..9) {
                //通知觀察者執行onNext()方法
                it.onNext(i)
            }
            //通知觀察者資料傳送完成
            it.onComplete()
        }
    }

    private fun createObserver(): Observer<String> {
        return object : Observer<String> {
            override fun onSubscribe(d: Disposable) {
                text.append("subscribe\n")
            }

            override fun onNext(string: String) {
                text.append("$string\n")
            }

            override fun onComplete() {
                text.append("complete.")
            }

            override fun onError(e: Throwable) {
                text.append(e.message)
            }
        }
    }
}

上面的程式碼原始發射資料是0~9的一列數字,flatMap傳入一個原始資料,輸出一個被觀察者,也就是將單個數字單獨用一個被觀察者發射出去。執行以上程式,顯示如下:
flatMap之後

當展開的所有被觀察者都完成後,原始的觀察者才會完成。

flatMap有個類似的方法concatMap,使用上沒有差別,區別在於flatMap展開後是無序的,concatMap展開後是有序的,使用concatMap的話,展開的一列被觀察者將會按照原始資料的順序依次發射

internal操作符

使用internal操作符實現Timer的效果,internal操作傳入三個引數:
initialDelay : 延遲多長時間開始
period : 間隔多長時間
unit : 時間單位
例:

class MainActivity : AppCompatActivity() {

    private var disposable: Disposable? = null
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //建立一個被觀察者,間隔1秒發射資料
        val observable = createObservable()
        //建立一個觀察者,接收資料
        val observer = createObserver()
        //建立訂閱
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer)

    }

    private fun createObservable(): Observable<Long> {
        return Observable.interval(0, 1, TimeUnit.SECONDS)

    }

    private fun createObserver(): Observer<Long> {
        return object : Observer<Long> {
            override fun onSubscribe(d: Disposable) {
                text.append("subscribe\n")
                disposable = d
            }

            override fun onNext(data: Long) {
                text.append("$data\n")
                if (data >= 9) {
                    disposable?.dispose()
                    onComplete()
                }
            }

            override fun onComplete() {
                text.append("complete.")
            }

            override fun onError(e: Throwable) {
                text.append(e.message)
            }
        }
    }
}

執行以上程式,顯示如下:
internal操作符

程式碼中可以看出,onSubscribe中的Disposable代表建立的訂閱,需要取消訂閱時,使用disposable.dispose方法即可。如果不取消訂閱,internal的onNext回撥將一直執行,直到當前程式程序被殺死。
由此可以看出,RxJava使用時一定要及時關閉訂閱,否則會導致記憶體洩漏問題,有一個比較方便的框架RxLifecycle,可以幫助我們在Activity生命週期中關閉訂閱

RxLifecycle

RxLifecycle的Github地址為:https://github.com/trello/RxLifecycle

使用

1.在build.gradle中匯入RxLifecycle

implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.2'

注:RxLifecycle中包含了RxJava包,所以匯入了RxLifecycle之後就可以不用匯入RxJava了,但是RxLifecycle不包含RxAndroid,所以還是需要匯入RxAndroid

2.我們先寫一個不加入RxLifecycle的訂閱,測試一下RxJava帶來的記憶體洩漏:

class MainActivity : AppCompatActivity() {
    companion object {
        const val TAG = "MainActivity"
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //建立一個被觀察者,間隔1秒發射資料
        val observable = createObservable()
        //建立一個觀察者,接收資料
        val observer = createObserver()
        //建立訂閱
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer)
    }


    private fun createObservable(): Observable<Long> {
        return Observable.interval(0, 1, TimeUnit.SECONDS)

    }

    private fun createObserver(): Observer<Long> {
        return object : Observer<Long> {
            override fun onSubscribe(d: Disposable) {
                text.append("subscribe\n")
                Log.d(TAG, "onSubscribe")
            }

            override fun onNext(data: Long) {
                text.append("$data\n")
                Log.d(TAG, "$data")
            }

            override fun onComplete() {
                text.append("complete.")
                Log.d(TAG, "onComplete")
            }

            override fun onError(e: Throwable) {
                text.append(e.message)
                Log.d(TAG, "onError")
            }
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        Log.d(TAG, "onDestroy")
    }
}

執行程式,可以看到如下Log:

com.example.studyrxjava D/MainActivity: onSubscribe
com.example.studyrxjava D/MainActivity: 0
com.example.studyrxjava D/MainActivity: 1
com.example.studyrxjava D/MainActivity: 2
com.example.studyrxjava D/MainActivity: 3
com.example.studyrxjava D/MainActivity: 4
com.example.studyrxjava D/MainActivity: 5
com.example.studyrxjava D/MainActivity: 6
com.example.studyrxjava D/MainActivity: 7
com.example.studyrxjava D/MainActivity: 8
com.example.studyrxjava D/MainActivity: onDestroy
com.example.studyrxjava D/MainActivity: 9
com.example.studyrxjava D/MainActivity: 10
com.example.studyrxjava D/MainActivity: 11
com.example.studyrxjava D/MainActivity: 12
com.example.studyrxjava D/MainActivity: 13
com.example.studyrxjava D/MainActivity: 14
com.example.studyrxjava D/MainActivity: 15
com.example.studyrxjava D/MainActivity: 16
com.example.studyrxjava D/MainActivity: 17
...

由此可見,在Activity已經onDestroy之後,RxJava的internal訂閱仍在執行,這就是記憶體洩漏了。
3.加上RxLifecycle修復此記憶體洩漏,程式碼如下

class MainActivity : RxAppCompatActivity() {
    companion object {
        const val TAG = "MainActivity"
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //建立一個被觀察者,間隔1秒發射資料
        val observable = createObservable()
        //建立一個觀察者,接收資料
        val observer = createObserver()
        //建立訂閱
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .compose(bindUntilEvent(ActivityEvent.DESTROY))
                .subscribe(observer)
    }


    private fun createObservable(): Observable<Long> {
        return Observable.interval(0, 1, TimeUnit.SECONDS)

    }

    private fun createObserver(): Observer<Long> {
        return object : Observer<Long> {
            override fun onSubscribe(d: Disposable) {
                text.append("subscribe\n")
                Log.d(TAG, "onSubscribe")
            }

            override fun onNext(data: Long) {
                text.append("$data\n")
                Log.d(TAG, "$data")
            }

            override fun onComplete() {
                text.append("complete.")
                Log.d(TAG, "onComplete")
            }

            override fun onError(e: Throwable) {
                text.append(e.message)
                Log.d(TAG, "onError")
            }
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        Log.d(TAG, "onDestroy")
    }
}

首先將繼承類由AppCompatActivity改為RxAppCompatActivity,這是使用RxLifecycle必需的,然後建立訂閱時加入.compose(bindUntilEvent(ActivityEvent.DESTROY))即可,這行程式碼需要加在subscribe(observer)呼叫之前

執行以上加入了RxLifecycle的程式碼,Log顯示如下:

com.example.studyrxjava D/MainActivity: onSubscribe
com.example.studyrxjava D/MainActivity: 0
com.example.studyrxjava D/MainActivity: 1
com.example.studyrxjava D/MainActivity: 2
com.example.studyrxjava D/MainActivity: 3
com.example.studyrxjava D/MainActivity: 4
com.example.studyrxjava D/MainActivity: 5
com.example.studyrxjava D/MainActivity: 6
com.example.studyrxjava D/MainActivity: onDestroy
com.example.studyrxjava D/MainActivity: onComplete

可以看到,onDestroy之後RxJava的internal訂閱就結束了。

以上,就是RxJava RxAndroid RxLifecycle基本使用。
原始碼已上傳:
https://github.com/wkxjc/StudyRxJava2