1. 程式人生 > >Spark學習筆記(3)—— Spark計算模型 RDD

Spark學習筆記(3)—— Spark計算模型 RDD

1 彈性分散式資料集RDD

1.1 什麼是 RDD

RDD(Resilient Distributed Dataset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料流模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

1.2 RDD 屬性

  1. 一組分片(Partition),即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。
  2. 一個計算每個分割槽的函式。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。
  3. RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。(應該是子RDD能夠知道從父RDD的哪些分割槽讀取。如果子RDD的某些分割槽的資料丟失了,可以從對應的父RDD進行重新計算)
  4. 一個Partitioner,即RDD的分片函式。當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
  5. 一個列表,儲存存取每個Partition的優先位置(preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的塊的位置。按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。

1.3 建立 RDD

  1. 由一個已經存在的Scala集合建立
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  1. 由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile("hdfs://node1.itcast.cn:9000/words.txt")

2 RDD 的程式設計API

API學習連結 http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

2.1 Transformation

RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。 常用的Transformation 在這裡插入圖片描述

2.2 Action

在這裡插入圖片描述

3 RDD的依賴關係

RDD和它依賴的父RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)。 在這裡插入圖片描述

3.1 窄依賴

窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用 總結:窄依賴形象的比喻為獨生子女

3.2 寬依賴

寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition 總結:形象的比喻為超生

3.3 Lineage

RDD只支援粗粒度轉換,即在大量記錄上執行的單個操作。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分割槽。RDD的Lineage會記錄RDD的元資料資訊和轉換行為,當該RDD的部分分割槽資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的資料分割槽。

4 RDD 的快取

Spark速度非常快的原因之一,就是在不同操作中可以在記憶體中持久化或快取個數據集。當持久化某個RDD後,每一個節點都將把計算的分片結果儲存在記憶體中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得後續的動作變得更加迅速。RDD相關的持久化和快取,是Spark最重要的特徵之一。可以說,快取是Spark構建迭代式演算法和快速互動式查詢的關鍵。

4.1 RDD快取方式

RDD通過persist方法或cache方法可以將前面的計算結果快取,但是並不是這兩個方法被呼叫時立即快取,而是觸發後面的action時,該RDD將會被快取在計算節點的記憶體中,並供後面重用。 在這裡插入圖片描述 通過檢視原始碼發現cache最終也是呼叫了persist方法,預設的儲存級別都是僅在記憶體儲存一份,Spark的儲存級別還有好多種,儲存級別在object StorageLevel中定義的。 在這裡插入圖片描述 快取有可能丟失,或者儲存儲存於記憶體的資料由於記憶體不足而被刪除,RDD的快取容錯機制保證了即使快取丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的資料會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。

4.2 DAG的生成

DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關係的不同將DAG劃分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,因此寬依賴是劃分Stage的依據。 在這裡插入圖片描述