慕課 從零到一spark進階之路(一)
阿新 • • 發佈:2018-11-17
1.RDD
RDD是spark特有的資料模型,談到RDD就會提到什麼彈性分散式資料集,什麼有向無環圖,本文暫時不去展開這些高深概念。
(0)隨便找個點理解以下
最重要的記住,RRD是不可變的,也就是說,已有的RDD不能被修改或者更新,但可以從已有的RDD轉化成一個新的RDD.
上面的特性解讀:
RDD可以cache到記憶體中,每次對RDD資料集的操作之後的結果,都可以存放到記憶體中,下一個操作可以直接從記憶體中輸入,省去了MapReduce大量的磁碟IO操作。這對於迭代運算比較常見的機器學習演算法, 互動式資料探勘來說,效率提升比較大。
下面我們來列舉以下RDD的基本特性:
(1)RDD的特點
1)建立:只能通過轉換 ( transformation ,如map/filter/groupBy/join 等,區別於動作 action) 從兩種資料來源中建立 RDD 1 )穩定儲存中的資料; 2 )其他 RDD。 2)只讀:狀態不可變,不能修改。 3)分割槽:支援使 RDD 中的元素根據那個 key 來分割槽 ( partitioning ) ,儲存到多個結點上。還原時只會重新計算丟失分割槽的資料,而不會影響整個系統。 4)路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的資訊關於它是如何從其他 RDD 產生而來的。 5)持久化:支援將會被重用的 RDD 快取 ( 如 in-memory 或溢位到磁碟 )。 6)延遲計算: Spark 也會延遲計算 RDD ,使其能夠將轉換管道化 (pipeline transformation)。 7)操作:豐富的轉換(transformation)和動作 ( action ) , count/reduce/collect/save 等。 執行了多少次transformation操作,RDD都不會真正執行運算(記錄lineage),只有當action操作被執行時,運算才會觸發。123456789
(2)RDD的好處
1)RDD只能從持久儲存或通過Transformations操作產生,相比於分散式共享記憶體(DSM)可以更高效實現容錯,對於丟失部分資料分割槽只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。 2)RDD的不變性,可以實現類Hadoop MapReduce的推測式執行。 3)RDD的資料分割槽特性,可以通過資料的本地性來提高效能,這不Hadoop MapReduce是一樣的。 4)RDD都是可序列化的,在記憶體不足時可自動降級為磁碟儲存,把RDD儲存於磁碟上,這時效能會有大的下降但不會差於現在的MapReduce。 5)批量操作:任務能夠根據資料本地性 (data locality) 被分配,從而提高效能。123456
(3)RDD的內部屬性
通過RDD的內部屬性,使用者可以獲取相應的元資料資訊。通過這些資訊可以支援更復雜的演算法或優化。
1)分割槽列表:通過分割槽列表可以找到一個RDD中包含的所有分割槽及其所在地址。
2)計算每個分片的函式:通過函式可以對每個資料塊進行RDD需要進行的使用者自定義函式運算。
3)對父RDD的依賴列表,依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。
4)可選:key-value型的RDD是根據雜湊來分割槽的,類似於mapreduce當中的Paritioner介面,控制key分到哪個reduce。
5)可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。(儲存的是一個表,可以將處理的分割槽“本地化”)
1234567
2. RDD操作
2.1 轉化操作
RDD的轉化操作是返回新RDD的操作, 常用轉化操作總結如下:
表1: 對一個數據為{1,2,3,3}的RDD進行基本的轉化操作
函式名 | 目的 | 示例 | 結果 |
---|---|---|---|
map() | 將函式應用於RDD中每個元素, 將返回值構成新的RDD | rdd.map(x=>x+1) | {2,3,4,5} |
flatMap() | 將函式應用於RDD中的每個元素, 將返回的迭代器的所有內容構成新的RDD, 常用來切分單詞 | rdd.flatMap(x=>x.to(2)) | {1,2,2} |
filter() | 返回一個通過傳入給filter()的函式的元素組成的RDD | rdd.filter(x=> x>2) | {3,3} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(withReplacement, fraction, [seed]) | 對RDD取樣, 以及是否替換 | rdd.sample(false, 0.5) | 非確定的 |
表2: 對資料分別為{1,2,3}和{2,3,4}RDD進行鍼對2個RDD的轉化操作
函式名 | 目的 | 示例 | 結果 |
---|---|---|---|
redcue() | 並行整合RDD中的所有元素 | rdd.reduce((x, y) => x+y) | 9 |
collect() | 返回RDD中的所有元素 | rdd.collect() | {1,2,3,4} |
count() | 求RDD中的元素個數 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出現的次數 | rdd.countByValue() | {1,1}, {2, 1}, {3,2} |
take(n) | 從RDD中返回n個元素 | rdd.take(2) | {1,2} |
top(n) | 從RDD中返回前n個元素 | rdd.top(3) | {3,3,2} |
foreach(func) | 對RDD中的每個元素使用給定的函式 | rdd.foreach(print) | 1,2,3,3 |
2.2 行動操作
RDD的行動操作會把最終求得的結果返回驅動器程式, 或者寫入外部儲存系統中。
表3: 對一個數據為{1,2,3,3}的RDD進行基本RDD的行動操作
函式名 | 目的 | 示例 | 結果 |
---|---|---|---|
redcue() | 並行整合RDD中的所有元素 | rdd.reduce((x, y) => x+y) | 9 |
collect() | 返回RDD中的所有元素 | rdd.collect() | {1,2,3,4} |
count() | 求RDD中的元素個數 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出現的次數 | rdd.countByValue() | {1,1}, {2, 1}, {3,2} |
take(n) | 從RDD中返回n個元素 | rdd.take(2) | {1,2} |
top(n) | 從RDD中返回前n個元素 | rdd.top(3) | {3,3,2} |
foreach(func) | 對RDD中的每個元素使用給定的函式 | rdd.foreach(print) | 1,2,3,3 |