1. 程式人生 > >RxJava 常用操作符大全(一)

RxJava 常用操作符大全(一)

這裡寫圖片描述

create

create操作符是所有建立型操作符的“根”,也就是說其他建立型操作符最後都是通過create操作符來建立Observable的

呼叫例子如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
        } catch
(Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Overridepublicvoid onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Overridepublicvoid
onCompleted() { System.out.println("Sequence complete."); } });

執行結果如下:

    Next: 1 
    Next: 2 
    Next: 3 
    Next: 4 
    Sequence complete.

from

from操作符是把其他型別的物件和資料型別轉化成Observable

呼叫例子如下:

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);
myObservable.subscribe( new Action1<Integer>() { @Override publicvoid call(Integer item) { System.out.println(item); } }, new Action1<Throwable>() { @Override publicvoid call(Throwable error) { System.out.println("Error encountered: " + error.getMessage()); } }, new Action0() { @Override publicvoid call() { System.out.println("Sequence complete"); } } );

執行結果如下:

    0 
    1 
    2 
    3 
    4 
    5 
    Sequence complete

just

just操作符也是把其他型別的物件和資料型別轉化成Observable,它和from操作符很像,只是方法的引數有所差別

呼叫例子如下:

Observable.just(1, 2, 3)
.subscribe(new Subscriber<Integer>() {
@Overridepublicvoid onNext(Integer item) {
System.out.println("Next: " + item);
}

@Overridepublicvoid onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Overridepublicvoid onCompleted() {
System.out.println("Sequence complete.");
}
});

執行結果如下:

    Next: 1 
    Next: 2 
    Next: 3 
    Sequence complete.

defer

defer操作符是直到有訂閱者訂閱時,才通過Observable的工廠方法建立Observable並執行,defer操作符能夠保證Observable的狀態是最新的

下面通過比較defer操作符和just操作符的執行結果作比較

Integer i = 10;

@SuppressWarnings("unchecked")
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

Observable justObservable = Observable.just(i);
i = 12;
Observable deferObservable = Observable
.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(i);
}
            });
i = 15;

justObservable.subscribe(new Subscriber() {
@Override
public void onCompleted() {

        }

@Override
public void onError(Throwable e) {

        }

@Override
public void onNext(Object o) {
System.out.println("just result:" + o.toString());
}
    });

deferObservable.subscribe(new Subscriber() {
@Override
public void onCompleted() {

        }

@Override
public void onError(Throwable e) {

        }

@Override
public void onNext(Object o) {
System.out.println("defer result:" + o.toString());
}
    });
}

執行結果如下:

    just result:10 
    defer result:15

可以看到,just操作符是在建立Observable就進行了賦值操作,而defer是在訂閱者訂閱時才建立Observable,此時才進行真正的賦值操作

注意:不能用int

timer

timer操作符是建立一串連續的數字,產生這些數字的時間間隔是一定的;這裡有兩種情況:

一種是隔一段時間產生一個數字,然後就結束,可以理解為延遲產生數字

一種是每隔一段時間就產生一個數字,沒有結束符,也就是是可以產生無限個連續的數字

timer操作符預設情況下是執行在一個新執行緒上的,當然你可以通過傳入引數來修改其執行的執行緒。

下面是呼叫例子:

Subscription observable = null;

@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.timer(2, 2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {

@Override
public void onCompleted() {
// TODO Auto-generated method stub

}

@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub

}

@Override
public void onNext(Long arg0) {
// TODO Auto-generated method stub
System.out.println("Next:" + arg0.toString());

}
            });
}

@Override
protected void onPause() {
// TODO Auto-generated method stub
super.onPause();
    if (observable != null) {
observable.unsubscribe();
}
}

執行結果如下:

    Next:0 
    Next:1 
    Next:2 
    Next:3 
    ……

注意:Observable.subscribe()方法可以返回一個Subscription的物件,即我們每次訂閱都會返回,可以通過該物件,取消訂閱。

interval

interval操作符是每隔一段時間就產生一個數字,這些數字從0開始,一次遞增1直至無窮大;interval操作符的實現效果跟上面的timer操作符的第二種情形一樣。

Subscription observable = null;

@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.interval(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {

@Override
public void onCompleted() {
// TODO Auto-generated method stub

}

@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub

}

@Override
public void onNext(Long arg0) {
// TODO Auto-generated method stub
System.out.println("Next:" + arg0.toString());

}
            });
}

@Override
protected void onPause() {
// TODO Auto-generated method stub
super.onPause();
    if (observable != null) {
observable.unsubscribe();
}
}

執行結果:

這裡寫圖片描述

若不呼叫取消訂閱,則會一直增加。

range

range操作符是建立一組在從n開始,個數為m的連續數字,比如range(3,10),就是建立3、4、5…12的一組數字

呼叫例子如下:

Subscription observable = null;

@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.range(0, 10).subscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {
// TODO Auto-generated method stub
System.out.println("Sequence complete.");
}

@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub

}

@Override
public void onNext(Integer arg0) {
// TODO Auto-generated method stub
System.out.println(""+arg0);

}
    });



}

執行結果:
這裡寫圖片描述

repeat/repeatWhen

repeat操作符是對某一個Observable,重複產生多次結果

repeatWhen操作符是對某一個Observable,有條件地重新訂閱從而產生多次結果

repeat和repeatWhen操作符預設情況下是執行在一個新執行緒上的,當然你可以通過傳入引數來修改其執行的執行緒。

repeat呼叫例子如下:

//連續產生兩組(1,2,3)的數字
Subscription observable = null;

@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.range(1, 3).repeat(2).subscribe(new Subscriber<Integer>() {

@Override
public void onCompleted() {
// TODO Auto-generated method stub
System.out.println("onCompleted");
}

@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub

}

@Override
public void onNext(Integer arg0) {
// TODO Auto-generated method stub
System.out.println(""+arg0);

}});
}

執行結果如下:
這裡寫圖片描述

說點什麼吧

不得不說,Rx的操作符真的是太強大了,當然,也太龐大了,所以,每個都記住也是不可能的,至少我不行。

這些都詳細的一批了。

這個RxJava的系列,最後一個文也是關於Rx操作符的,我也把這些操作符都寫在了一個Demo(操作符大全)上,有興趣的(額,當然,都是爛大街的了。。。)我就不說了吧。

更多操作符,請看下回分解吧~這裡寫圖片描述