1. 程式人生 > >慕課 從零到一spark進階之路(一)

慕課 從零到一spark進階之路(一)

1.RDD

RDD是spark特有的資料模型,談到RDD就會提到什麼彈性分散式資料集,什麼有向無環圖,本文暫時不去展開這些高深概念。

(0)隨便找個點理解以下

最重要的記住,RRD是不可變的,也就是說,已有的RDD不能被修改或者更新,但可以從已有的RDD轉化成一個新的RDD.

上面的特性解讀: 
RDD可以cache到記憶體中,每次對RDD資料集的操作之後的結果,都可以存放到記憶體中,下一個操作可以直接從記憶體中輸入,省去了MapReduce大量的磁碟IO操作。這對於迭代運算比較常見的機器學習演算法, 互動式資料探勘來說,效率提升比較大。 
下面我們來列舉以下RDD的基本特性:

(1)RDD的特點

  1)建立:只能通過轉換 ( transformation ,如map/filter/groupBy/join 等,區別於動作 action) 從兩種資料來源中建立 RDD 1 )穩定儲存中的資料; 2 )其他 RDD。
  2)只讀:狀態不可變,不能修改。
  3)分割槽:支援使 RDD 中的元素根據那個 key 來分割槽 ( partitioning ) ,儲存到多個結點上。還原時只會重新計算丟失分割槽的資料,而不會影響整個系統。
  4)路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的資訊關於它是如何從其他 RDD 產生而來的。
  5)持久化:支援將會被重用的 RDD 快取 ( 如 in-memory 或溢位到磁碟 )。
  6)延遲計算: Spark 也會延遲計算 RDD ,使其能夠將轉換管道化 (pipeline transformation)。
  7)操作:豐富的轉換(transformation)和動作 ( action ) , count/reduce/collect/save 等。
  執行了多少次transformation操作,RDD都不會真正執行運算(記錄lineage),只有當action操作被執行時,運算才會觸發。123456789

(2)RDD的好處

  1)RDD只能從持久儲存或通過Transformations操作產生,相比於分散式共享記憶體(DSM)可以更高效實現容錯,對於丟失部分資料分割槽只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
  2)RDD的不變性,可以實現類Hadoop MapReduce的推測式執行。
  3)RDD的資料分割槽特性,可以通過資料的本地性來提高效能,這不Hadoop MapReduce是一樣的。
  4)RDD都是可序列化的,在記憶體不足時可自動降級為磁碟儲存,把RDD儲存於磁碟上,這時效能會有大的下降但不會差於現在的MapReduce。
  5)批量操作:任務能夠根據資料本地性 (data locality) 被分配,從而提高效能。123456

(3)RDD的內部屬性

  通過RDD的內部屬性,使用者可以獲取相應的元資料資訊。通過這些資訊可以支援更復雜的演算法或優化。
  1)分割槽列表:通過分割槽列表可以找到一個RDD中包含的所有分割槽及其所在地址。
  2)計算每個分片的函式:通過函式可以對每個資料塊進行RDD需要進行的使用者自定義函式運算。
  3)對父RDD的依賴列表,依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。
  4)可選:key-value型的RDD是根據雜湊來分割槽的,類似於mapreduce當中的Paritioner介面,控制key分到哪個reduce。
  5)可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。(儲存的是一個表,可以將處理的分割槽“本地化”)   
1234567

2. RDD操作

2.1 轉化操作

RDD的轉化操作是返回新RDD的操作, 常用轉化操作總結如下:

表1: 對一個數據為{1,2,3,3}的RDD進行基本的轉化操作

函式名 目的 示例 結果
map() 將函式應用於RDD中每個元素, 將返回值構成新的RDD rdd.map(x=>x+1) {2,3,4,5}
flatMap() 將函式應用於RDD中的每個元素, 將返回的迭代器的所有內容構成新的RDD, 常用來切分單詞 rdd.flatMap(x=>x.to(2)) {1,2,2}
filter() 返回一個通過傳入給filter()的函式的元素組成的RDD rdd.filter(x=> x>2) {3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement, fraction, [seed]) 對RDD取樣, 以及是否替換 rdd.sample(false, 0.5) 非確定的

表2: 對資料分別為{1,2,3}和{2,3,4}RDD進行鍼對2個RDD的轉化操作

函式名 目的 示例 結果
redcue() 並行整合RDD中的所有元素 rdd.reduce((x, y) => x+y) 9
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,4}
count() 求RDD中的元素個數 rdd.count() 4
countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {1,1}, {2, 1}, {3,2}
take(n) 從RDD中返回n個元素 rdd.take(2) {1,2}
top(n) 從RDD中返回前n個元素 rdd.top(3) {3,3,2}
foreach(func) 對RDD中的每個元素使用給定的函式 rdd.foreach(print) 1,2,3,3

2.2 行動操作

RDD的行動操作會把最終求得的結果返回驅動器程式, 或者寫入外部儲存系統中。

表3: 對一個數據為{1,2,3,3}的RDD進行基本RDD的行動操作

函式名 目的 示例 結果
redcue() 並行整合RDD中的所有元素 rdd.reduce((x, y) => x+y) 9
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,4}
count() 求RDD中的元素個數 rdd.count() 4
countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {1,1}, {2, 1}, {3,2}
take(n) 從RDD中返回n個元素 rdd.take(2) {1,2}
top(n) 從RDD中返回前n個元素 rdd.top(3) {3,3,2}
foreach(func) 對RDD中的每個元素使用給定的函式 rdd.foreach(print) 1,2,3,3