03_RxJava 過濾型操作符(Filtering)程式碼示例
阿新 • • 發佈:2018-11-11
package com.gdc.rxjava; import java.util.concurrent.TimeUnit; import rx.Observable; import rx.Observable.OnSubscribe; import rx.functions.Func1; import rx.Subscriber; public class TestFilting { public static void main(String[] args) { // testDebounce(); // testDistinct(); // testElementAt(); // testFilter(); // testFirst(); // testIgnoreElements(); // testLast(); // testSample(); // testSkip(); // testTake(); testTakeLast(); } private static void testDebounce() { Observable.create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> arg0) { try { for (int i = 0; i < 10; i++) { Thread.sleep(1000); arg0.onNext(i); } arg0.onCompleted(); } catch (InterruptedException e) { arg0.onError(e); } } }).debounce(2, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testDistinct() { Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testElementAt() { Observable.just(1, 2, 3, 2, 3).elementAt(3).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testFilter() { Observable.just(1, 2, 3, 2, 3).distinct().filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer arg0) { // 在此指定過濾規則 return arg0 > 2; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testFirst() { Observable.just(9, 2, 3, 2, 3).distinct().first().subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testIgnoreElements() { Observable.just(123).ignoreElements().subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testLast() { Observable.just(9, 2, 3, 2, 3).distinct().last().subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testSample() { Observable.create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> arg0) { try { for (int i = 0; i < 10; i++) { Thread.sleep(1000); arg0.onNext(i); } } catch (InterruptedException e) { e.printStackTrace(); } } }).sample(4, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testSkip() { Observable.just(1, 2, 3, 4, 5).skip(2).skipLast(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testTake() { Observable.just(1, 2, 3, 4, 5).take(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } private static void testTakeLast() { Observable.just(1, 2, 3, 4, 5).takeLast(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError():" + arg0); } @Override public void onNext(Integer arg0) { System.out.println("onNext():" + arg0); } }); } }