1. 程式人生 > >02_RxJava轉換操作符程式碼示例

02_RxJava轉換操作符程式碼示例

package com.gdc.rxjava;

import java.util.List;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;

public class TransformingDemo {

	public static void main(String[] args) {
		// testMap();
		// testFlatMap();
		// testGroupBy();
		// testBuffer();
		testScan();
	}

	private static void testMap() {
		// 將整型資料轉換成String型別輸出
		Observable.just(1, 2, 3).map(new Func1<Integer, String>() {

			@Override
			public String call(Integer arg0) {
				return arg0 + "";
			}
		}).subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(String arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}

	private static void testFlatMap() {

		Observable.just(1, 2, 3, 4, 5).flatMap(new Func1<Integer, Observable<? extends String>>() {

			@Override
			public Observable<? extends String> call(Integer arg0) {

				return Observable.just(arg0 + "");
			}
		}).subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(String arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}

	private static void testGroupBy() {
		Observable.just(1, 2, 3, 4, 5).groupBy(new Func1<Integer, Integer>() {

			@Override
			public Integer call(Integer arg0) {
				// 分組規則
				return arg0 % 2;
			}
		}).subscribe(new Observer<GroupedObservable<Integer, Integer>>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(GroupedObservable<Integer, Integer> arg0) {

				arg0.subscribe(new Subscriber<Integer>() {

					@Override
					public void onCompleted() {

					}

					@Override
					public void onError(Throwable arg0) {

					}

					@Override
					public void onNext(Integer data) {
						System.out.println("group:" + arg0.getKey() + " data:" + data);
					}
				});
			}
		});
	}

	private static void testBuffer() {
		// 隨機生成5個數字,一次性訂閱兩次,然後打印出來,通過執行得知,只需要3次即可以把資料全部打印出來
		Observable.range(1, 5).buffer(2).subscribe(new Subscriber<List<Integer>>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(List<Integer> arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}

	private static void testScan() {
		// 1到5的和
		Observable.range(1, 5).scan(new Func2<Integer, Integer, Integer>() {

			@Override
			public Integer call(Integer sum, Integer arg1) {
				// sum:當前資料的和,arg1:每一次需要累加的資料
				return sum + arg1;
			}
		}).subscribe(new Subscriber<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted()");
			}

			@Override
			public void onError(Throwable arg0) {
				System.out.println("onError():" + arg0);
			}

			@Override
			public void onNext(Integer arg0) {
				System.out.println("onNext():" + arg0);
			}
		});
	}
}