Spark學習記錄(三)核心API模組介紹
spark
-------------
基於hadoop的mr,擴充套件MR模型高效使用MR模型,記憶體型叢集計算,提高app處理速度。
spark特點
-------------
速度:在記憶體中儲存中間結果。
支援多種語言。Scala、Java、Python
內建了80+的運算元.
高階分析:MR,SQL/ Streamming /mllib / graph
RDD:
----------------
是spark的基本資料結構,是不可變資料集。RDD中的資料集進行邏輯分割槽,每個分割槽可以單獨在叢集節點
進行計算。可以包含任何java,scala,python和自定義型別。
RDD是隻讀的記錄分割槽集合。RDD具有容錯機制。
建立RDD方式,一、並行化一個現有集合。
hadoop 花費90%時間使用者rw。、
記憶體處理計算。在job間進行資料共享。記憶體的IO速率高於網路和disk的10 ~ 100之間。
內部包含5個主要屬性
-----------------------
1.分割槽列表
2.針對每個split的計算函式。
3.對其他rdd的依賴列表
4.可選,如果是KeyValueRDD的話,可以帶分割槽類。
5.可選,首選塊位置列表(hdfs block location);
RDD變換
rdd的變換方法都是lazy執行的
------------------
返回指向新rdd的指標,在rdd之間建立依賴關係。每個rdd都有計算函式和指向父RDD的指標。
map() //對每個元素進行變換,應用變換函式
//(T)=>V
mapPartitions() //對每個分割槽進行應用變換,輸入的Iterator,返回新的迭代器,可以對分割槽進行函式處理。
//針對每個資料分割槽進行操作,入參是分割槽資料的Iterator,map() 針對分割槽中的每個元素進行操作。
mapPartitions() //Iterator<T> => Iterator<U>
注:最好設定每個分割槽都對應有一個執行緒。
filter() //過濾器,(T)=>Boolean
flatMap() //壓扁,T => TraversableOnce[U]
//同mapPartitions方法一樣都是針對分割槽處理,只不過這個方法可以獲取到分割槽索引
mapPartitionsWithIndex(func) //(Int, Iterator<T>) => Iterator<U>
//取樣返回取樣的RDD子集。
//withReplacement 元素是否可以多次取樣.
//fraction : 期望取樣數量.[0,1]
//表示一個種子,根據這個seed隨機抽取,一般都只用到前兩個引數
sample(withReplacement, fraction, seed)
作用:在資料傾斜的時候,我們那麼多資料如果想知道那個key傾斜了,就需要我們取樣獲取這些key,出現次數陊的key就是導致資料傾斜的key。如果這些key資料不是很重要的話,可以過濾掉,這樣就解決了資料傾斜。
union() //類似於mysql union操作。
intersection //交集,提取兩個rdd中都含有的元素。
distinct([numTasks])) //去重,去除重複的元素。
groupByKey() //(K,V) => (K,Iterable<V>) 使用前需要構造出對偶的RDD
reduceByKey(*) //按key聚合。注意他是一個RDD變換方法,不是action
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key進行聚合,這個函式邏輯較為複雜請看aggregateByKey函式的專題
sortByKey //根據對映的Key進行排序,但是隻能根據Key排序
sortBy //比sortByKey更加靈活強大的排序,可根據元組中任意欄位排序
join(otherDataset, [numTasks]) //橫向連線,有兩種資料(K,V)和(K,W),連結後返回(K,(V,W)),兩個元組一一對應的
cogroup //協分組,(K,V)和(K,W)分組後返回(K,(V,W)),注意協分組不是一一對應的分組後需要(此處注意與join的區別)
cartesian(otherDataset) //笛卡爾積,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]
pipe //將rdd的元素傳遞給指令碼或者命令,執行結果返回形成新的RDD
coalesce(numPartitions) //減少分割槽
repartition //再分割槽
repartitionAndSortWithinPartitions(partitioner)//再分割槽並在分割槽內進行排序
RDD Action
Spack的中的方法都是懶的,,只有遇到了action型別的方法才會真正的執行
------------------
collect() //收集rdd元素形成陣列.
count() //統計rdd元素的個數
reduce() //聚合,返回一個值。
first //取出第一個元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path) //儲存到檔案
saveAsSequenceFile(path) //儲存成序列檔案 sc.sequenceFile讀取序列檔案
saveAsObjectFile(path) (Java and Scala)
countByKey() //按照key統計有幾個value
資料傾斜
------------------------------
由於大量相同的Key,在reduce合併計算的過程中,大量相同的Key被分配到了同一個叢集節點,導致叢集中這個節點計算壓力非常大。
本例採用的解決方案是,在map截斷將Key先接上一個隨機數打散,然後在reduce計算後,再次map還原key,然後進行最終reduce。
Spark WebUI 上面程式碼執行的DAG 有效無環圖,我們可以清楚地看到每一次的reduce聚合都會重新劃分階段