1. 程式人生 > >spark RDD程式設計

spark RDD程式設計

RDD,也就是 彈性分散式資料集 的簡稱,它是spark處理的分散式元素集合。 對於RDD的操作包括: 建立RDD(從外部資料或者記憶體中的資料),轉化RDD(利用篩選條件等),呼叫RDD操作求值
**注意:RDD的操作分為兩種:一種是 “轉化操作”,這種操作相當於只是定義了RDD,例如從一個RDD篩選出另一個RDD。轉化操作的特點就是:返回結果仍然是一個RDD物件,轉化操作並不會立刻執行,而是會惰性的執行,也就是到了不得不執行的時候才執行。另一種是 “行動操作”,是指呼叫RDD物件的方法返回一些值,例如統計一個文字RDD有多少行等.
**對於可能重複使用的RDD,可以使用persist()方法將其儲存在記憶體中(也可以通過選項將其儲存在磁碟上),預設情況下,cache()方法與persist()方法效果一樣。


一、建立RDD的方法
①可以直接使用sc的parallelize()方法將陣列引數轉換成RDD物件,例如:
>>> line= sc.parallelize(["hello this is the first line","this is the second line","hi I am the third","this is the last line"]) >>> for a in line.take(line.count()): ...     print(a) ... hello this is the first line this is the second line hi I am the third this is the last line >>>
這是在python指令碼中執行的方法,如果是在java中的話,使用方法如下:
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("test02"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); //從記憶體中插入 JavaRDD<String> line1 = javaSparkContext.parallelize(Arrays.asList("this is first line", "second line", "last line bye")); //輸出(行動操作) int n = (int) line1.count(); List<String> list1 = line1.take(n); for (int i = 0; i < list1.size(); i++) {     System.out.println(list1.get(i)); }

但是在大資料處理過程中,一般這種做法是不存在的,因為大量資料是不會從記憶體中匯入的,而是要從磁碟中讀取。
②從磁碟中讀取文字檔案的方法

從磁碟中讀取文字檔案使用方法如下: >>> line= sc.textFile("D:\spark\README.md") >>> line2=line.filter(lambda a:"Java" in a) >>> for m in line2.take(line2.count()): ...     print(m) ... high-level APIs in Scala, Java, Python, and R, and an optimized engine that >>>
這裡使用了python中的lambda表示式篩選出含有Java的行數。
在java應用程式中使用這種方式也類似:
//從外部文字檔案讀取 JavaRDD<String> line2 = javaSparkContext.textFile("D:\\spark\\README.md"); //篩選(轉化操作) line2 = line2.filter(new Function<String, Boolean>() {     public Boolean call(String s) throws Exception {         return s.contains("Python");     } }); //輸出 int m = (int) line2.count(); List<String> list2 = line2.take(m); for (int i = 0; i < list2.size(); i++) {     System.out.println(list2.get(i)); }

關於惰性求值:
當執行轉化操作,例如 sc.textFile(" ... ") 的時候,其實並沒有從檔案中讀取資料,所以即使檔案路徑不存在,也不會報錯,但是我們可以通過行動操作來檢測前面的轉化操作有沒有問題。

二、RDD的相關操作 ①篩選元素的轉化操作, filter() 在java中,使用filter要傳入一個Function<String,Boolean>類的繼承類,該類的public Boolean call(String s)方法返回一個bool值,返回true表示符合篩選條件。 line2 = line2.filter(new Function<String, Boolean>() {     public Boolean call(String s) throws Exception {         return s.contains("Python");     } });

②聯合兩個RDD的方法, union() 使用方法在各個語言中一樣,都是傳入一個RDD引數,然後將兩個RDD聯合在一起。 下面是shell中使用的示例:
>>> line1=sc.parallelize(["hello world","second line","last line"]) >>> line2=sc.parallelize(["another line","really last line"]) >>> line1.count() 3 >>> line2.count() 2 >>> for a in line1.take(3): ...     print(a) ... hello world second line last line >>> for a in line2.take(2): ...     print(a) ... another line really last line >>> line3 = line1.union(line2) >>> line3.count() 5 >>> for a in line3.take(5): ...     print(a) ... hello world second line last line another line really last line >>>
**注意:filter()和union()函式不會改變原來的兩個RDD,而是會新生成RDD,所以需要賦值給新的變數
③計算文字行數的行動操作,count() count()是一個行動操作,當在某一個RDD上執行count()函式的時候,它所依賴的轉化操作,例如篩選等會依次被執行。
④取RDD的前n行作為字串列表,take(n)
⑤java向spark傳遞函式的方法



三、RDD的其它常用轉化操作和行動操作 ①轉化操作 filter() map() 的區別 filter()簡單來說就是一個過濾器。可以只篩選出符合函式引數的資料,而map()函式則可以修改對元素做一對的修改,即將元素作為引數,根據對映關係改變元素的值(這個說法不確切,應該說是修改元素副本的值,返回的也是副本)。 filter()的使用方法在之前已經說過了,這裡主要介紹一下map()的用法。
在shell中: >>> line=sc.parallelize([1,2,3,3]) >>> line2=line.map(lambda x:x*x) >>> line2.count() 4 >>> for a in line2.take(4): ...     print(a) ... 1 4 9 9 >>>
這裡注意:無論是filter()還是map(),都是轉化操作,只要是轉化操作都不會改變原來RDD的資料,而是會把新的RDD作為返回值。
在java中使用map將元素做對映: public static void main(String[] args) {     SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("test03");     JavaSparkContext sc = new JavaSparkContext(sparkConf);     JavaRDD<Integer> line = sc.parallelize(Arrays.asList(1, 2, 3, 4));     //使用map對資料集中的每一個元素做對映處理     JavaRDD<Integer> line2 = line.map(new Function<Integer, Integer>() {         public Integer call(Integer integer) throws Exception {             return (int)Math.pow(integer,3);         }     });     int num = (int) line2.count();     List<Integer> list = line2.take(num);     for (int i = 0; i < num; i++) {         System.out.println(list.get(i));     }     sc.stop(); }
這裡仍然是需要Function的例項,只不過泛型不同。
map() flatMap() 的區別 上面看到map()函式可以將元素做一對一的對映從而達到處理資料的目的,而flatMap()函式則可以對函式做一對多的對映,即將一個元素對映成為多個元素,原先後面的元素依然排在後面。舉個例子,例如將每個元素是一句話,想要把每句話對映成為一個個空格分開的單詞,如果使用map就只能返回一個數組,最後返回的RDD是由陣列組成的。但是使用flatMap就可以返回一個由所有單片語成的元素個數變長了的RDD。
下面是shell中的例子: >>> line=sc.parallelize(["hello a world","hello second world","good hello word"]) >>> line2=line.map(lambda s:s.split(" ")) >>> line2.first() ['hello', 'a', 'world']   #可以看到這裡第一個元素變成了一個數組 >>> line2.count() 3 >>> line3=line.flatMap(lambda s:s.split(" ")) >>> line3.count() 9 >>> for a in line3.take(9): ...     print(a) ... hello a world hello second world good hello word >>>
java中使用flatMap的示例: public static void main(String[] args) {     SparkConf conf = new SparkConf().setMaster("local").setAppName("test04");     JavaSparkContext sc = new JavaSparkContext(conf);     JavaRDD<String> line = sc.parallelize(Arrays.asList("hello world", "good very hello", "you me hi"));     JavaRDD<String> line2 = line.flatMap(new FlatMapFunction<String, String>() {         public Iterable<String> call(String s) throws Exception {             return Arrays.asList(s.split(" "));         }     });     int num = (int)line2.count();     List<String> strs = line2.take(num);     for(int i = 0;i<num;i++){         System.out.println(strs.get(i));     }     sc.stop(); }

③偽集合操作 RDD本身不是嚴格意義上的集合(因為它不滿足唯一性),但是它也支援許多集合操作。例如並集、交集、差集等等。 注意:只有資料型別相同的兩個RDD才能做集合運算。
*去除RDD資料集中的重複元素的轉化操作可以使用 distinct()函式,但是需要注意:該函式開銷比較大,因為這中間存在一個混洗(將所有資料通過網路與其他機器上的資料比較)的操作。
shell示例: >>> line=sc.parallelize([1,2,3,4,1,3]) >>> line2=line.distinct() >>> line2.count() [Stage 18:>                                                         (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 19:>                                                         (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 4 >>> for a in line2.take(4): ...     print(a) ... D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 2 4 1 3 >>>
可以看到:混洗(shuffle)之後的資料是無序的(這一點還需要深入研究)
* union()函式可以取並集,這個之前說過,不再贅述
* intersection(other) 方法可以取交集,不過這裡會自動去重,所以也會有混洗的過程,執行比較慢。 >>> line1=sc.parallelize([1,2,3,2,1,4]) >>> line2=sc.parallelize([2,4,5,6]) >>> line3=line1.intersection(line2) >>> line3.count() [Stage 32:>                                                         (0 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 32:=============================>                            (2 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 33:>                                                         (0 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 33:==============>                                           (1 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 33:=============================>                            (2 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 2 >>> for a in line3.take(2): ...     print(a) ... D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 37:>                                                         (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 4 2 >>>
* subtract(other) 函式可以取兩個集合的差集。 >>> line1=sc.parallelize([1,2,3,4,4,5,1]) >>> line2=sc.parallelize([1,3,6,7]) >>> lin3=line1.subtract(line2) >>> line3.count() [Stage 39:>                                                         (0 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 39:==============>                                           (1 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 39:=============================>                            (2 + 2) / 4]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 2 >>> for a in line3.take(2): ...     print(a) ... D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 43:>                                                         (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 4 2 >>>
可以看到,這裡取差集也會執行混洗操作自動去重
*求兩個RDD資料集的笛卡爾積: cartesian() 函式,返回一個由二元組組成的RDD >>> line1=sc.parallelize([1,2,3,4,4,5,1]) >>> line2=sc.parallelize([2,4,5,6]) >>> line3=line1.cartesian(line2) >>> line3.count() 28 >>> for a in line3.take(28): ...     print(a,end='') ... (1, 2)(1, 4)(2, 2)(2, 4)(3, 2)(3, 4)(1, 5)(1, 6)(2, 5)(2, 6)(3, 5)(3, 6)(4, 2)(4, 4)(4, 2)(4, 4)(5, 2)(5, 4)(1, 2)(1, 4)(4, 5)(4, 6)(4, 5)(4, 6)(5, 5)(5, 6)(1, 5)(1, 6)>>>

④行動操作
* collect() 函式可以將RDD的所有值全部以集合或者值的形式返回回來,但是這個函式一般只能用在單元測試中,因為實際的資料量太大,無法一次性讀入到記憶體。 >>> line1.collect() [1, 2, 3, 4, 4, 5, 1] >>>
* take(n) 函式,可以取前n個數據,之前說過,這裡不再贅述。
* reduce() 函式,常用在求和中,該函式接受一個函式作為引數,使用該函式從頭開始計算兩兩資料,最後得到一個值,但是該函式的缺點是計算結果只能與資料型別相同。
shell示例: >>> line=sc.parallelize([1,3,2,4]) >>> total=line.reduce(lambda x,y:x+y) >>> total 10 >>
* aggregate()函式算是reduce()函式的升級版,它可以指定兩個元素操作之後的結果。該函式需要傳入三個引數:初始值,本地當前結果與下一個元素的操作函式,節點之間計算結果的操作函式。 例如想要求平均值,就需要返回一個總值和總個數的二元組。使用python指令碼示例如下: >>> line=sc.parallelize([1,3,2,4,4,2]) >>> ave = line.aggregate((0,0),lambda x,v:(x[0]+v,x[1]+1),lambda y,z:(y[0]+z[0],y[1]+z[1])) >>> ave[0]/ave[1] 2.6666666666666665 >>> ave[0] 16 >>> ave[1] 6 >>>
在這個示例中,(0,0) 是初始值,分別表示初始總和和初始元素個數,第二個引數指定了當前結果(即一個二元組)與下一個元素操作的函式,第三個引數指定了多個節點之間的結果如何合併。
java中的實現如下:
public class TestAggregate {
    public static void main(String[] args) {         SparkConf conf = new SparkConf().setMaster("local").setAppName("test05");         JavaSparkContext sc = new JavaSparkContext(conf);         JavaRDD<Integer> line = sc.parallelize(Arrays.asList(1, 2, 3, 4, 2, 1));
        AveResult ave = line.aggregate(new AveResult(), new Function2<AveResult, Integer, AveResult>() {             public AveResult call(AveResult aveResult, Integer integer) throws Exception {                 AveResult aveResult1 = new AveResult();                 aveResult1.setSum(aveResult.getSum() + integer);                 aveResult1.setNum(aveResult.getNum() + 1);                 return aveResult1;             }         }, new Function2<AveResult, AveResult, AveResult>() {             public AveResult call(AveResult aveResult, AveResult aveResult2) throws Exception {                 AveResult aveResult1 = new AveResult();                 aveResult1.setSum(aveResult.getSum() + aveResult2.getSum());                 aveResult1.setNum(aveResult.getNum() + aveResult2.getNum());                 return aveResult1;             }         });
        System.out.println(ave.getAve());
        sc.stop();     } }
/**  * 注意:這裡一定要實現序列化介面,否則會報錯  */ class AveResult implements Serializable{     private int sum;     private int num;
    public AveResult() {         sum = 0;         num = 0;     }
    public double getAve() {         return ((double) sum) / num;     }
    public int getSum() {         return sum;     }
    public void setSum(int sum) {         this.sum = sum;     }
    public int getNum() {         return num;     }
    public void setNum(int num) {         this.num = num;     } }

這裡特別注意: 自定義的返回值型別一定要實現序列化介面!!
* foreach(func) 函式,有時我們會對 RDD 中的所有元素應用一個行動操作,但是不把任何結果返回到驅動器程 序中,這也是有用的。比如可以用 JSON 格式把資料傳送到一個網路伺服器上,或者把數 據存到資料庫中。不論哪種情況,都可以使用 foreach() 行動操作來對 RDD 中的每個元 素進行操作,而不需要把 RDD 發回本地。
* takeSample(withReplacement, num, seed) 函式可以讓我們從資料中獲取一個取樣,並指定是否替換。 這個返回值是不確定的: >>> line.takeSample(0,2) [4, 4] >>> line.takeSample(0,2) [3, 1] >>> line.takeSample(0,2) [1, 2] >>>


四、持久化 如前所述,Spark RDD 是惰性求值的,而有時我們希望能多次使用同一個 RDD。如果簡單 地對 RDD 呼叫行動操作,Spark 每次都會重算 RDD 以及它的所有依賴。這在迭代演算法中 消耗格外大,因為迭代演算法常常會多次使用同一組資料。 為了避免多次計算同一個 RDD,可以讓 Spark 對資料進行持久化。當我們讓 Spark 持久化 儲存一個 RDD 時,計算出 RDD 的節點會分別儲存它們所求出的分割槽資料。如果一個有持 久化資料的節點發生故障,Spark 會在需要用到快取的資料時重算丟失的資料分割槽。如果 希望節點故障的情況不會拖累我們的執行速度,也可以把資料備份到多個節點上。 出於不同的目的,我們可以為 RDD 選擇不同的持久化級別(如表 3-6 所示)。在 Scala(見 例 3-40)和 Java 中,預設情況下 persist() 會把資料以序列化的形式快取在 JVM 的堆空 間中。在 Python 中,我們會始終序列化要持久化儲存的資料,所以持久化級別預設值就是 以序列化後的物件儲存在 JVM 堆空間中。當我們把資料寫到磁碟或者堆外儲存上時,也 總是使用序列化後的資料。


!!注意:如果只是呼叫persist()函式,並不會觸發強制求值,只是會儲存下這個從策略,只有當呼叫行動操作的時候才會真正計算RDD
最後,RDD 還有一個方法叫作 unpersist(),呼叫該方法可以手動把持久化的 RDD 從緩 存中移除。



五、java中不同RDD型別的轉換