1. 程式人生 > >Spark學習記錄(三)核心API模組介紹

Spark學習記錄(三)核心API模組介紹

spark
-------------
基於hadoop的mr,擴充套件MR模型高效使用MR模型,記憶體型叢集計算,提高app處理速度。

spark特點
-------------
速度:在記憶體中儲存中間結果。
支援多種語言。Scala、Java、Python
內建了80+的運算元.
高階分析:MR,SQL/ Streamming /mllib / graph

RDD:
----------------
是spark的基本資料結構,是不可變資料集。RDD中的資料集進行邏輯分割槽,每個分割槽可以單獨在叢集節點
進行計算。可以包含任何java,scala,python和自定義型別。

RDD是隻讀的記錄分割槽集合。RDD具有容錯機制。

建立RDD方式,一、並行化一個現有集合。

hadoop 花費90%時間使用者rw。、

記憶體處理計算。在job間進行資料共享。記憶體的IO速率高於網路和disk的10 ~ 100之間。

內部包含5個主要屬性
-----------------------
1.分割槽列表
2.針對每個split的計算函式。
3.對其他rdd的依賴列表
4.可選,如果是KeyValueRDD的話,可以帶分割槽類。
5.可選,首選塊位置列表(hdfs block location);

 

RDD變換

rdd的變換方法都是lazy執行的
------------------
返回指向新rdd的指標,在rdd之間建立依賴關係。每個rdd都有計算函式和指向父RDD的指標。


map() //對每個元素進行變換,應用變換函式
//(T)=>V

mapPartitions() //對每個分割槽進行應用變換,輸入的Iterator,返回新的迭代器,可以對分割槽進行函式處理。

//針對每個資料分割槽進行操作,入參是分割槽資料的Iterator,map() 針對分割槽中的每個元素進行操作。

mapPartitions()  //Iterator<T> => Iterator<U>

注:最好設定每個分割槽都對應有一個執行緒。

filter() //過濾器,(T)=>Boolean
flatMap() //壓扁,T => TraversableOnce[U]

 

//同mapPartitions方法一樣都是針對分割槽處理,只不過這個方法可以獲取到分割槽索引

mapPartitionsWithIndex(func)  //(Int, Iterator<T>) => Iterator<U>

 

//取樣返回取樣的RDD子集。
//withReplacement 元素是否可以多次取樣.
//fraction : 期望取樣數量.[0,1]
//表示一個種子,根據這個seed隨機抽取,一般都只用到前兩個引數
sample(withReplacement, fraction, seed)

作用:在資料傾斜的時候,我們那麼多資料如果想知道那個key傾斜了,就需要我們取樣獲取這些key,出現次數陊的key就是導致資料傾斜的key。如果這些key資料不是很重要的話,可以過濾掉,這樣就解決了資料傾斜。

 

 

union() //類似於mysql union操作。

 

intersection //交集,提取兩個rdd中都含有的元素。


distinct([numTasks])) //去重,去除重複的元素。

 

groupByKey() //(K,V) => (K,Iterable<V>)  使用前需要構造出對偶的RDD

reduceByKey(*) //按key聚合。注意他是一個RDD變換方法,不是action

 

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key進行聚合,這個函式邏輯較為複雜請看aggregateByKey函式的專題

 

sortByKey //根據對映的Key進行排序,但是隻能根據Key排序

 

sortBy //比sortByKey更加靈活強大的排序,可根據元組中任意欄位排序

 

join(otherDataset, [numTasks]) //橫向連線,有兩種資料(K,V)和(K,W),連結後返回(K,(V,W)),兩個元組一一對應的

 

cogroup //協分組,(K,V)和(K,W)分組後返回(K,(V,W)),注意協分組不是一一對應的分組後需要(此處注意與join的區別)



cartesian(otherDataset) //笛卡爾積,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]

 

pipe //將rdd的元素傳遞給指令碼或者命令,執行結果返回形成新的RDD


coalesce(numPartitions) //減少分割槽


repartition //再分割槽


repartitionAndSortWithinPartitions(partitioner)//再分割槽並在分割槽內進行排序

 

RDD Action

Spack的中的方法都是懶的,,只有遇到了action型別的方法才會真正的執行
------------------
collect() //收集rdd元素形成陣列.
count() //統計rdd元素的個數
reduce() //聚合,返回一個值。
first //取出第一個元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])

saveAsTextFile(path) //儲存到檔案
saveAsSequenceFile(path) //儲存成序列檔案  sc.sequenceFile讀取序列檔案

saveAsObjectFile(path) (Java and Scala)

countByKey()                       //按照key統計有幾個value 

 

資料傾斜

------------------------------

由於大量相同的Key,在reduce合併計算的過程中,大量相同的Key被分配到了同一個叢集節點,導致叢集中這個節點計算壓力非常大。

本例採用的解決方案是,在map截斷將Key先接上一個隨機數打散,然後在reduce計算後,再次map還原key,然後進行最終reduce。


Spark WebUI 上面程式碼執行的DAG 有效無環圖,我們可以清楚地看到每一次的reduce聚合都會重新劃分階段