1. 程式人生 > >響應式函數語言程式設計和RxJava的理解

響應式函數語言程式設計和RxJava的理解

面向函數語言程式設計, 即會將函式作為一個數據物件.下面看Android中RxJava的實現

RxJava 是一個觀察者模式的擴充套件:

RxJava 一個觀察者模式的擴充套件,
觀察者模式: 比如Button的點選事件,對設定 OnClickListener 來說, Button 是被觀察者, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法產生關係。
            OnClickListener一直觀察著Button,當Button被點選,OnClickListener執行onClick事件。
            而RxJava中 Observable(可觀察的,被觀察者),Observer(觀察者),subscribe(訂閱),事件;Observable和Observer
            之間通過subscribe()方法實現訂閱關係,從而Observable可以在需要的時候發出事件來通知Observer


       以下的Demo即為幾個實現的小例子

TestActivity是RxJava幾個關鍵字的用法;  被觀察者訂閱觀察者, 被觀察者提供資料, 觀察者觀察到資料的變化, onNext()方法中得到資料, 處理資料

package com.example.mytestrxjava;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

/*
響應式函式程式設計
RxJava 一個觀察者模式的擴充套件,
觀察者模式: 比如Button的點選事件,對設定 OnClickListener 來說, Button 是被觀察者, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法產生關係。
            OnClickListener一直觀察著Button,當Button被點選,OnClickListener執行onClick事件。
            而RxJava中 Observable(可觀察的,被觀察者),Observer(觀察者),subscribe(訂閱),事件;Observable和Observer
            之間通過subscribe()方法實現訂閱關係,從而Observable可以在需要的時候發出事件來通知Observer

       以下的Demo即為幾個實現的小例子

 */
public class TestActivity extends AppCompatActivity {
    public static final String TAG = "AppCompatActivity";

    private  String[] words = {"aaaa", "bbbb", "cccc"};

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_test);

    }

    public void onClick(View view) {
        switch (view.getId()) {
            case R.id.btn_sample:
                test();
                break;
            case R.id.btn_just:
                just();
                break;
            case R.id.btn_from:
                myFrom();
                break;
            case R.id.btn_action:
                action();
                break;
            case R.id.btn_map:
                map();
                break;
            case R.id.btn_flatmap:
                flatmap();
                break;
            case R.id.btn_filter:
                filter();
                break;
        }
    }




    private void test() {
        //被觀察者
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("aaaa");
                subscriber.onNext("bbbb");
                subscriber.onNext("cccc");
                subscriber.onCompleted();
            }
        });
        //觀察者
        Observer<String> observer = new Observer<String>() {


            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,"Error" + e.toString());
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG,s);
            }
        };

        //被觀察者 訂閱觀察者
        observable.subscribe(observer);
    }

    private void just() {
        //使用just 不用重寫call方法,  被觀察者
        Observable<String> observable = Observable.just("aaa2","bbb2","ccc2");
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "Error" + e.toString());
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, s);
            }
        };
        observable.subscribe(observer);

    }

    /** 當傳入的是一個數組的時候 , 可以使用from*/
    private void myFrom() {
        Observable<String> observable = Observable.from(words);
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "Error" + e.toString());
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, s);
            }
        };
        observable.subscribe(observer);
    }

    /** 不是訂閱一個觀察者, 而是訂閱一個單獨執行的任務 */
    private void action() {
        Action1 action1 = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG,s);
            }
        };
       Action1 action2 = new Action1<Throwable>() {
           @Override
           public void call(Throwable e) {
               Log.e(TAG, "Error" + e.toString());
           }
       };

        Action0 action3 = new Action0() {
            @Override
            public void call() {
                Log.e(TAG,"oncompleted");
            }
        };
        Observable<String> observable = Observable.from(words);
        observable.subscribe(action1);
        observable.subscribe(action1,action2);
        observable.subscribe(action1,action2,action3);
    }


    /**
     * 被觀察者進行資料轉換, 如果我們希望傳入的是String型別,而處理的型別是int型別
     */
    private void map() {
        Observable.just("aaa","bbb","ccc")
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return s.length();
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e(TAG,"length" + integer);   //因為傳入了三個物件,  所以被呼叫了三次
                    }
                })
                ;
    }


    /**
     * flapMap()中返回的是個Observable物件,並且這個Observable物件並不是被直接傳送到了
     * Subscriber回撥方法中.
     * 例如下面的例子,可以看做是,一個String返回了一個Observable,一個Observable執行了三次Action
     */
    private void flatmap() {
        Observable.just("aaaa","bbb","cc")
                .flatMap(new Func1<String, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(String s) {
                        Integer[] info = new Integer[3];
                        info[0] = s.length();
                        info[1] = s.hashCode();
                        info[2] = s.getBytes().length;
                        return Observable.from(info);    //有陣列可以使用from關鍵字

                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG,integer+"");
            }
        });
    }


    /**
     * filter指定過濾
     * 比如我指定過濾字串中含有"a"的字串
     */
    private void filter() {
        //一句話鏈式程式設計解決文字,   被觀察者訂閱了觀察者
        Observable.from(words)
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.contains("a");
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.e(TAG,"過濾的字串為" + s);
                    }
                });
    }





}

執行緒管理Scheduler使用:

在不指定執行緒的情況下, RxJava 遵循的是執行緒不變的原則,即:在哪個執行緒呼叫 subscribe(),就在哪個執行緒生產事件;在哪個執行緒生產事件,就在哪個執行緒消費事件。如果需要切換執行緒,就需要用到 Scheduler (排程器)。


Schedulers.immediate(): 直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler。


Schedulers.newThread(): 總是啟用新執行緒,並在新執行緒執行操作。


Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。


Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。


另外, Android 還有一個專用的AndroidSchedulers.mainThread(),它指定的操作將在 Android 主執行緒執行。

下面的Demo ImageActivity就是在網路上獲取圖片然後展示的例子

package com.example.mytestrxjava;

import android.graphics.Bitmap;
import android.graphics.BitmapFactory;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.widget.ImageView;
import android.widget.Toast;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;

public class ImageActivity extends AppCompatActivity {
    private static final String TAG = "ImageActivity";
    public static String imageurl = "https://mobile.umeng.com/images/pic/home/social/img-1.png";
    private ImageView mIv;


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_image);

        mIv = (ImageView) findViewById(R.id.iv);

        init();
    }

    private void init() {
        //被觀察者
        Observable<Bitmap> observable = Observable.create(new Observable.OnSubscribe<Bitmap>() {
            @Override
            public void call(Subscriber<? super Bitmap> subscriber) {
                byte[] data = getNetData();
                Bitmap bitmap1 = binary2Bitmap(data);
                subscriber.onNext(bitmap1);    //在IO執行緒,  且訂閱者物件將資料傳遞給觀察者
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io())    //指定subscribe()發生在IO執行緒
                .observeOn(AndroidSchedulers.mainThread())   //指定Subscriber的回調發生在主執行緒
                ;

        //觀察者
        Observer<Bitmap> observer = new Observer<Bitmap>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Toast.makeText(ImageActivity.this,e.toString(),Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(Bitmap bitmap) {
                //觀察者對 被觀察者給的資料進行處理
                Log.e(TAG,"bitmap:" + bitmap.toString());
                mIv.setImageBitmap(bitmap);
            }
        };
        observable.subscribe(observer);
    }

    /**
     * 將byte[]陣列資料 轉化為BItmap物件
     * @param data
     * @return
     */
    private Bitmap binary2Bitmap(byte[] data) {
        if(data != null) {
            return BitmapFactory.decodeByteArray(data,0,data.length);
        }
        return null;
    }

    /**
     * 從網路獲取圖片的資料,得到位元組資料
     * @return
     */
    private byte[] getNetData() {
        ByteArrayOutputStream boas = null;
        InputStream in = null;
        try {
            boas = new ByteArrayOutputStream();

            URL url = new URL(imageurl);  //異常抓取
                HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                httpURLConnection.setConnectTimeout(2000);
            in = httpURLConnection.getInputStream();
            byte[] buf = new byte[4 * 1024];
            int b;
            while ((b = in.read(buf)) != -1) {
                boas.write(buf,0,b);
            }
            Log.e(TAG,boas.toString());
            return boas.toByteArray();


        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }finally {                   //流的異常處理
            if(in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }finally {
                    if(boas != null) {
                        try {
                            boas.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

由於網路請求要放在子執行緒,所以使用Schedulers.io(), 而回調需要設定ImageView, 需要放到主執行緒,所以使用AndroidSchedulers.mainThread()

至此,Rxjava的使用方法完畢

相關推薦

響應語言程式設計RxJava理解

面向函數語言程式設計, 即會將函式作為一個數據物件.下面看Android中RxJava的實現 RxJava 是一個觀察者模式的擴充套件: RxJava 一個觀察者模式的擴充套件, 觀察者模式: 比如Button的點選事件,對設定 OnClickListener 來說, Bu

語言程式設計響應程式設計

在程式開發中,a=b+c;賦值之後,b或者c的值變化後,a的值不會跟著變化。響應式程式設計目標就是,如果b或者c的數值發生變化,a的數值會同時發生變化。 函數語言程式設計 函數語言程式設計是一系列被不公平對待的程式設計思想的保護傘,它的核心思想是,它是一

語言程式設計指令程式設計

突然直接明白了他們的含義。所謂指令式程式設計,是以命令為主的,給機器提供一條又一條的命令序列讓其原封不動的執行。程式執行的效率取決於執行命令的數量。因此才會出現大O表示法等等表示時間空間複雜度的符號。而函式式語言並不是通常意義上理解的“通過函式的變換進行程式設計”。注意到純的

Lua語言程式設計區域性函式詳解

         函數語言程式設計中的函式這個術語不是指計算機中的函式(實際上是Subroutine),而是指數學中的函式,即自變數的對映。也就是說一個函式的值僅決定於函式引數的值,不依賴其他狀態。比如sqrt(x)函式計算x的平方根,只要x不變,不論什麼時候呼叫,呼叫幾次

一步一步教你理解實現iOS中的鏈程式設計語言程式設計

談到鏈式程式設計和函數語言程式設計,那Masonry幾乎就是最經典的代表.如: make.top.equalTo(self.view).offset(60) 像這樣top.equalTo(s

OC中鏈程式設計語言程式設計

最近看到了鏈式程式設計和函數語言程式設計這兩個概念,這兩天不是那麼的忙 所以也研究了下這兩個概念; 在查詢鏈式程式設計和函數語言程式設計的概念時突然想到了鏈式程式設計和函數語言程式設計最典型的代表是Masonry 比較完美的實現了函數語言程式設計和鏈式程式設計 例如 [

ReactiveCocoa 函式響應程式設計簡介 鏈程式設計 語言程式設計 響應程式設計

最近,RAC的名氣可謂是越來越大,出於對技術的探索心(為了裝逼),最近研究學習了一下RAC,以下是本人在學習中對其的一些粗淺的認識; 首先,什麼是RAC,ReactiveCocoa時Github上的一個Cocoa FRP框架,目的為了接管蘋果的所有事件機制(addTarge

語言程式設計響應程式設計之己見

1. what is 函數語言程式設計? 函式,在程式設計中,通常體現為: 輸入 => 執行 => 結果。他不是命令式的,而是對一段操作進行邏輯封裝,拿到輸入,就能產出結果。通常來說,滿足函數語言程式設計的特性的“函式”應該有如下特點: 函式必須有入參,並且函

深入淺出iOS語言程式設計響應程式設計概念

簡介本篇文章主要回顧一下——iOS函數語言程式設計 && 響應式程式設計概念,如何一步步實現函數語言程式設計的過程,對閱讀Masonry && SnapKit原始碼有一定的幫助。作為一個iOS 開發者,那麼你一定用過Masno

Python包結構語言程式設計

# 包的結構 |--包 |--|-- __init__.py 包的標誌檔案 |--|-- 模組1 |--|-- 模組2 |--|-- 子包(子資料夾) |--|--|-- __init__.py 包的標誌檔案 |--|--|-- 子包模組1 |--|--|--

在Scala中使用語言程式設計(函式高階函式)

                                            圖示,這是一個普通

s語言程式設計(三)-composepointFree

compose即函式巢狀組合 組合compose在第一篇已經初見端倪,可以感受一下。compose函式的實現用閉包的方法。不完善實現如下: const compose = (f, g) => { return x => f(g(x)); }; compose使用例項 你可以用ramda的

《scala語言程式設計》之模式匹配異常處理

package com.lyzx.day20181006 import java.io.{FileNotFoundException, FileReader, IOException} class MatchTest { /** * match 對應 Java

Java FP: Java中語言程式設計的MapFold(Reduce)

原文連結 作者:  Cyrille Martraire  譯者: 李璟([email protected]) 在函數語言程式設計中,Map和Fold是兩個非常有用的操作,它們存在於每一個函數語言程式設計語言中。既然Map和Fold操作如此強大和重要,但是Java語言缺乏Map和Fol

Java FP(Java8): Java中語言程式設計的MapFold(Reduce)

public double totalAmount(List<Double> amounts) { double sum = 0; for(double amount : amounts) { sum += amount; } return sum

[一] java8 語言程式設計入門 什麼是語言程式設計 函式介面概念 流收集器基本概念

本文是針對於java8引入函數語言程式設計概念以及stream流相關的一些簡單介紹 什麼是函數語言程式設計? java程式設計師第一反應可能會理解成類的成員方法一類的東西 此處並不是這個含義,更接近是數學上的函式 看一下百度百科中關於函式的說明 函式的定義: 給

遞迴、尾遞迴語言程式設計

<span style="font-family:SimSun">function fibonacciDynamically(n){ var fibonacci =new Array(n+1); return calculate(n); } function calculate(n){ if

Java8 新特性之流資料語言程式設計

一. 流式處理簡介在我接觸到java8流式處理的時候,我的第一感覺是流式處理讓集合操作變得簡潔了許多,通常我們需要多行程式碼才能完成的操作,藉助於流式處理可以在一行中實現。比如我們希望對一個包含整數的集合中篩選出所有的偶數,並將其封裝成為一個新的List返回,那麼在java8

指令程式設計 vs 語言程式設計 vs 反應程式設計

指令式程式設計 存在潛在的阻塞。 函數語言程式設計 避免處理資料的中間狀態。可以方便的處理資料流。 反應式程式設計 多種方式使用資料流,合併,過濾和轉換等。 抽象層次更高,關注於如何把小的方法

『 Python筆記』 lambda表示式語言程式設計

lambda表示式 lambda用來編寫簡單的函式,而def用來處理更強大的任務。 lambda的一般形式是關鍵字lambda後面跟一個或多個引數,緊跟一個冒號,以後是一個表示式。 lambda是一個表示式而不是一個語句。它能夠出現在Python語法不允許