Spark核心RDD、什麽是RDD、RDD的屬性、創建RDD、RDD的依賴以及緩存、
1:什麽是Spark的RDD???
RDD(Resilient Distributed Dataset)叫做分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裏面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。
2:RDD的屬性:
a、一組分片(Partition),即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那麽就會采用默認值。默認值就是程序所分配到的CPU Core的數目。 b、一個計算每個分區的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對叠代器進行復合,不需要保存每次計算的結果。 c、RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。 d、一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。 e、一個列表,存儲存取每個Partition的優先位置(preferred location)。對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
3:創建RDD:
a、由一個已經存在的Scala集合創建。 val rdd1= sc.parallelize(Array(1,2,3,4,5,6,7,8)) b、由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等 val rdd2 = sc.textFile("hdfs://master:9000/wordcount.txt")
4:RDD編程API:
4.1:Transformation:
RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
常用的Transformation如下所示:
轉換 |
含義 |
map(func) |
返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換後組成 |
filter(func) |
返回一個新的RDD,該RDD由經過func函數計算後返回值為true的輸入元素組成 |
flatMap(func) |
類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) |
類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是 (Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) |
對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) |
對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) |
對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
|
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
與sortByKey類似,但是更靈活 |
join(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesian(otherDataset) |
笛卡爾積 |
pipe(command, [envVars]) |
|
coalesce(numPartitions) |
|
repartition(numPartitions) |
|
repartitionAndSortWithinPartitions(partitioner) |
4.2:常用的Action如下所示:
動作 |
含義 |
reduce(func) |
通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可並聯的 |
collect() |
在驅動程序中,以數組的形式返回數據集的所有元素 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(類似於take(1)) |
take(n) |
返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) |
返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) |
|
saveAsTextFile(path) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。 |
saveAsObjectFile(path) |
|
countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) |
在數據集的每一個元素上,運行函數func進行更新。 |
5:WordCount中的RDD:
6:RDD的依賴關系:
RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
6.1:窄依賴:窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。總結:窄依賴我們形象的比喻為獨生子女。
6.2:寬依賴:寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition。總結:窄依賴我們形象的比喻為超生。
6.3:Lineage:RDD只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行為,當該RDD的部分分區數據丟失時,它可以根據這些信息來重新運算和恢復丟失的數據分區。
7:RDD的緩存:
Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存個數據集。當持久化某個RDD後,每一個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得後續的動作變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特征之一。可以說,緩存是Spark構建叠代式算法和快速交互式查詢的關鍵。
7.1:RDD緩存方式:
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
8:DAG的生成:
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同將DAG劃分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,因此寬依賴是劃分Stage的依據。
Spark核心RDD、什麽是RDD、RDD的屬性、創建RDD、RDD的依賴以及緩存、