1. 程式人生 > >【Spark】--Spark中RDD的理解

【Spark】--Spark中RDD的理解

1.什麼是RDD

RDDRDDSpark的計算模型 RDDResilient Distributed Dataset)叫做彈性的分散式資料集合,是Spark中最基本的資料抽象,它代表一個不可變、只讀的,被分割槽的資料集。

操作RDD就像操作本地集合一樣,資料會被分散到多臺機器中(以分割槽為單位)。

RDDSpark中的一個基本抽象(可以理解為代理)

有了RDD,就可以像操作本地的集合一樣,有很多的方法可以呼叫,使用方便,而無需關心底層的排程細節。

2.RDD操作型別

RDD中運算元可分為兩類:

RDD支援的兩中型別的操作:

轉換(Transformation):現有的RDD通過轉換生成一個新的RDD

lazy模式,延遲執行。

轉換的函式:mapfilterflatmapgroupByKeyreduceBykeyaggregateByKeyunionjoincoalesce等等。

動作(Action):RDD上執行計算,並返回結果給驅動程式(Drive)或寫入檔案系統。

動作操作函式:reduce,collect,count,frist,take,countByKey以及foreach等等。

collect該方法把資料收集到driver端   Array陣列型別

所有的transfromation只有遇到action才能執行。

當觸發執行action之後,資料型別就不再是RDD了,資料就會存到指定的檔案系統中,或者直接列印結果或者收集起來。

RDD操作流程示意:

 

 

RDD的執行邏輯:

下圖所示,在Spark應用中,整個執行流程在邏輯上運算之間會形成有向無環圖。Action運算元觸發之後會將所有累積的運算元形成一個有向無環圖,然後由排程器排程該圖上的任務進行運算。

Spark的排程方式與MapReduce有所不同。Spark根據RDD之間不同的依賴關係切分形成不同的階段(Stage),一個階段包含一系列函式進行流水線執行。

圖中的ABCDEFG,分別代表不同的RDDRDD內的一個方框代表一個數據塊。資料從HDFS輸入Spark,形成RDD ARDD CRDD C上執行map操作,轉換為RDD DRDD BRDD F

進行join操作轉換為G,而在BG的過程中又會進行Shuffle。最後RDD G通過函式saveAsSequenceFile輸出儲存到HDFS中。

 

RDD的轉換與操作:

wordcount例項,檢視lazy特性。

只有在執行action時,才會真正開始運算,才能得到結果或儲存到檔案中。

3.建立RDD

1)集合並行化建立(通過scala集合建立)scala中的本地集合------> spark RDD

val  arr=Array1  to  10

val  rdd=sc.parallelizearr

val  rdd=sc.makeRDDarr

2)

//讀取外部檔案系統,比如HDFS

val  rdd2 = sc.textFile(“hdfs://hdp-nn-01:9000/words.txt”)

//讀取本地檔案

val  rdd2 = sc.textFile(“file:///root/words.txt”)

3)從父RDD轉換成新的子RDD,最常用方式

 

呼叫Transformation 類的方法,生成新的RDD

4.RDD的分割槽:

rdd中和檔案切片相關的概念叫做分割槽,也就是說對rdd進行操作,實際上是操作的rdd中的每一個分割槽,分割槽的數量決定了並行的數量。

使用rdd.partitions.size或者rdd.partitions.length檢視分割槽數量。