Spark Core
Spark Core
DAG概念
有向無環圖
Spark會根據使用者提交的計算邏輯中的RDD的轉換(變換方法)和動作(action方法)來生成RDD之間的依賴關係,同時這個計算鏈也就生成了邏輯上的DAG。
RDD之間的關係可以從兩個維度來理解:一個是RDD是從哪些RDD轉換而來,也就是RDD的parent RDD(s)是什麼;還有就是依賴於parent RDD(s)的哪些Partition(s)。這個關係,就是RDD之間的依賴,org.apache.spark.Dependency。根據依賴於parent RDD(s)的Partitions的不同情況,Spark將這種依賴分為兩種,一種是寬依賴,一種是窄依賴。
DAG的生成與Stage的劃分
DAG的生成
原始的RDD(s)通過一系列轉換就形成了DAG。RDD之間的依賴關係,包含了RDD由哪些Parent RDD(s)轉換而來和它依賴parent RDD(s)的哪些Partitions,是DAG的重要屬性。
藉助這些依賴關係,DAG可以認為這些RDD之間形成了Lineage(血統,血緣關係)。藉助Lineage,能保證一個RDD被計算前,它所依賴的parent RDD都已經完成了計算;同時也實現了RDD的容錯性,即如果一個RDD的部分或者全部的計算結果丟失了,那麼就需要重新計算這部分丟失的資料。
Spark的Stage(階段)
Spark在執行任務(job)時,首先會根據依賴關係,將DAG劃分為不同的階段(Stage)
處理流程是:
1)Spark在執行Transformation型別操作時都不會立即執行,而是懶執行(計算)
2)執行若干步的Transformation型別的操作後,一旦遇到Action型別操作時,才會真正觸發執行(計算)
3)執行時,從當前Action方法向前回溯,如果遇到的是窄依賴則應用流水線優化,繼續向前找,直到碰到某一個寬依賴
4)因為寬依賴必須要進行shuffle,無法實現優化,所以將這一次段執行過程組裝為一個stage
5)再從當前寬依賴開始繼續向前找。重複剛才的步驟,從而將這個DAG還分為若干的stage
在stage內部可以執行流水線優化,而在stage之間沒辦法執行流水線優化,因為有shuffle。但是這種機制已經盡力的去避免了shuffle
Spark的Job和Task
原始的RDD經過一系列轉換後(一個DAG),會在最後一個RDD上觸發一個動作,這個動作會生成一個Job。
所以可以這樣理解:一個DAG對應一個Spark的Job。
在Job被劃分為一批計算任務(Task)後,這批Task會被提交到叢集上的計算節點去計算Spark的Task分為兩種:
1)org.apache.spark.scheduler.ShuffleMapTask
2)org.apache.spark.scheduler.ResultTask
簡單來說,DAG的最後一個階段會為每個結果的Partition生成一個ResultTask,其餘所有的階段都會生成ShufffleMapTask。
RDD
RDD就是帶有分割槽的集合型別
RDD是分散式的,彈性的,容錯的資料結構
彈性分散式資料集(RDD),特點是可以並行操作,並且是容錯的。有兩種方法可以建立RDD:
1)執行Transform操作(變換操作),
2)讀取外部儲存系統的資料集,如HDFS,HBase,或任何與Hadoop有關的資料來源。
注:建立RDD的方式有多種,比如案例一中是基於一個基本的集合型別(Array)轉換而來,像parallelize這樣的方法還有很多此外,我們也可以在讀取資料集時就建立RDD。
分割槽概念
可以在不同的機器上並行處理
它是spark提供的一個特殊集合類。諸如普通的集合型別,如傳統的Array:(1,2,3,4,5)是一個整體,但轉換成RDD後,我們可以對資料進行Partition(分割槽)處理,這樣做的目的就是為了分散式。
你可以讓這個RDD有兩個分割槽,那麼有可能是這個形式:RDD(1,2) (3,4)。
這樣設計的目的在於:可以進行分散式運算。
RDD操作
針對RDD的操作,分兩種,一種是Transformation(變換),一種是Actions(執行)。
Transformation(變換)操作屬於懶操作(運算元),不會真正觸發RDD的處理計算。
變換方法的共同點:1.不會馬上觸發計算 2.每當呼叫一次變換方法,都會產生一個新的RDD,Actions(執行)操作才會真正觸發。
RDD的依賴關係
RDD和它依賴的parent RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
1)窄依賴指的是每一個parent RDD的Partition最多被子RDD的一個Partition使用
對於窄依賴操作,它們只是將Partition的資料根據轉換的規則進行轉化,並不涉及其他的處理,可以簡單地認為只是將資料從一個形式轉換到另一個形式。
所以對於窄依賴,並不會引入昂貴的Shuffle。所以執行效率非常高。如果整個DAG中存在多個連續的窄依賴,則可以將這些連續的窄依賴整合到一起連續執行,中間不執行shuffle 從而提高效率,這樣的優化方式稱之為流水線優化。
此外,針對窄依賴,如果子RDD某個分割槽資料丟失,只需要找到父RDD對應依賴的分割槽,恢復即可。但如果是寬依賴,當分割槽丟失時,最糟糕的情況是要重算所有父RDD的所有分割槽。
2)寬依賴指的是多個子RDD的Partition會依賴同一個parent RDD的Partition。
對於groupByKey這樣的操作,子RDD的所有Partition(s)會依賴於parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的結果。
Shuffle概述
spark中一旦遇到寬依賴就需要進行shuffle的操作,所謂的shuffle的操作的本質就是將資料彙總後重新分發的過程
這個過程資料要彙總到一起,資料量可能很大所以不可避免的需要進行資料落磁碟的操作,會降低程式的效能,所以spark並不是完全記憶體不讀寫磁碟,只能說它盡力避免這樣的過程來提高效率 。
spark中的shuffle,在早期的版本中,會產生多個臨時檔案,但是這種多臨時檔案的策略造成大量檔案的同時的讀寫,磁碟的效能被分攤給多個檔案,每個檔案讀寫效率都不高,影響spark的執行效率。所以在後續的spark中(1.2.0之後的版本)的shuffle中,只會產生一個檔案,並且資料會經過排序再附加索引資訊,減少了檔案的數量並通過排序索引的方式提升了效能。
RDD容錯機制
分散式系統通常在一個機器叢集上執行,同時執行的幾百臺機器中某些出問題的概率大大增加,所以容錯設計是分散式系統的一個重要能力。
Spark以前的叢集容錯處理模型,像MapReduce,將計算轉換為一個有向無環圖(DAG)的任務集合,這樣可以通過重複執行DAG裡的一部分任務來完成容錯恢復。但是由於主要的資料儲存在分散式檔案系統中,沒有提供其他儲存的概念,容錯過程需要在網路上進行資料複製,從而增加了大量的消耗。所以,分散式程式設計中經常需要做檢查點,即將某個時機的中間資料寫到儲存(通常是分散式檔案系統)中。
RDD也是一個DAG,每一個RDD都會記住建立該資料集需要哪些操作,跟蹤記錄RDD的繼承關係,這個關係在Spark裡面叫lineage(血緣關係)。當一個RDD的某個分割槽丟失時,RDD是有足夠的資訊記錄其如何通過其他RDD進行計算,且只需重新計算該分割槽,這是Spark的一個創新。
RDD的快取
相比Hadoop MapReduce來說,Spark計算具有巨大的效能優勢,其中很大一部分原因是Spark對於記憶體的充分利用,以及提供的快取機制
RDD持久化(快取)
持久化在早期被稱作快取(cache),但快取一般指將內容放在記憶體中。雖然持久化操作在絕大部分情況下都是將RDD快取在記憶體中,但一般都會在記憶體不夠時用磁碟頂上去(比作業系統預設的磁碟交換效能高很多)。當然,也可以選擇不使用記憶體,而是僅僅儲存到磁碟中。所以,現在Spark使用持久化(persistence)這一更廣泛的名稱。
預設情況下,RDD只使用一次,用完即扔,再次使用時需要重新計算得到,而持久化(快取)操作避免了這裡的重複計算,實際測試也顯示持久化對效能提升明顯,這也是Spark剛出現時被人稱為記憶體計算框架的原因。
持久化的方法是呼叫persist()函式,除了持久化至記憶體中,還可以在persist()中指定storage level引數使用其他的型別,具體如下:
1)MEMORY_ONLY : 將 RDD 以反序列化的 Java 物件的形式儲存在 JVM 中. 如果記憶體空間不夠,部分資料分割槽將不會被快取,在每次需要用到這些資料時重新進行計算. 這是預設的級別。
cache()方法對應的級別就是MEMORY_ONLY級別
2)MEMORY_AND_DISK:將 RDD 以反序列化的 Java 物件的形式儲存在 JVM 中。如果記憶體空間不夠,將未快取的資料分割槽儲存到磁碟,在需要使用這些分割槽時從磁碟讀取。
3)MEMORY_ONLY_SER :將 RDD 以序列化的 Java 物件的形式進行儲存(每個分割槽為一個 byte 陣列)。這種方式會比反序列化物件的方式節省很多空間,尤其是在使用 fast serialize時會節省更多的空間,但是在讀取時會使得 CPU 的 read 變得更加密集。如果記憶體空間不夠,部分資料分割槽將不會被快取,在每次需要用到這些資料時重新進行計算。
4)MEMORY_AND_DISK_SER :類似於 MEMORY_ONLY_SER ,但是溢位的分割槽會儲存到磁碟,而不是在用到它們時重新計算。如果記憶體空間不夠,將未快取的資料分割槽儲存到磁碟,在需要使用這些分割槽時從磁碟讀取。
5)DISK_ONLY:只在磁碟上快取 RDD。
6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :與上面的級別功能相同,只不過每個分割槽在叢集中兩個節點上建立副本。
7)OFF_HEAP 將資料儲存在 off-heap memory 中。使用堆外記憶體,這是Java虛擬機器裡面的概念,堆外記憶體意味著把記憶體物件分配在Java虛擬機器的堆以外的記憶體,這些記憶體直接受作業系統管理(而不是虛擬機器)。使用堆外記憶體的好處:可能會利用到更大的記憶體儲存空間。但是對於資料的垃圾回收會有影響,需要程式設計師來處理
注意,可能帶來一些GC回收問題。
快取資料的清除
Spark 會自動監控每個節點上的快取資料,然後使用 least-recently-used (LRU) 機制來處理舊的快取資料。如果你想手動清理這些快取的 RDD 資料而不是去等待它們被自動清理掉,
可以使用 RDD.unpersist( ) 方法。
Spark 也會自動持久化一些在 shuffle 操作過程中產生的臨時資料(比如 reduceByKey),即便是使用者並沒有呼叫持久化的方法。這樣做可以避免當 shuffle 階段時如果一個節點掛掉了就得重新計算整個資料的問題。如果使用者打算多次重複使用這些資料,我們仍然建議使用者自己呼叫持久化方法對資料進行持久化。
Spark框架核心概念
1.RDD。彈性分散式資料集,是Spark最核心的資料結構。有分割槽機制,所以可以分散式進行處理。有容錯機制,通過RDD之間的依賴關係來恢復資料。
2.依賴關係。RDD的依賴關係是通過各種Transformation(變換)來得到的。父RDD和子RDD之間的依賴關係分兩種:①窄依賴 ②寬依賴
①針對窄依賴:父RDD的分割槽和子RDD的分割槽關係是:一對一
窄依賴不會發生Shuffle,執行效率高,spark框架底層會針對多個連續的窄依賴執行流水線優化,從而提高效能。例如 map flatMap等方法都是窄依賴方法
②針對寬依賴:父RDD的分割槽和子RDD的分割槽關係是:一對多
寬依賴會產生shuffle,會產生磁碟讀寫,無法優化。
3.DAG。有向無環圖,當一整條RDD的依賴關係形成之後,就形成了一個DAG。一般來說,一個DAG,最後都至少會觸發一個Action操作,觸發執行。一個Action對應一個Job任務。
4.Stage。一個DAG會根據RDD之間的依賴關係進行Stage劃分,流程是:以Action為基準,向前回溯,遇到寬依賴,就形成一個Stage。遇到窄依賴,則執行流水線優化(將多個連續的窄依賴放到一起執行)
5.task。任務。一個分割槽對應一個task。可以這樣理解:一個Stage是一組Task的集合
6.RDD的Transformation(變換)操作:懶執行,並不會立即執行
7.RDD的Action(執行)操作:觸發真正的執行
Spark Shuffle詳解
Shuffle,翻譯成中文就是洗牌。之所以需要Shuffle,還是因為具有某種共同特徵的一類資料需要最終匯聚(aggregate)到一個計算節點上進行計算。這些資料分佈在各個儲存節點上並且由不同節點的計算單元處理。
資料重新打亂然後匯聚到不同節點的過程就是Shuffle。但是實際上,Shuffle過程可能會非常複雜:
1)資料量會很大,比如單位為TB或PB的資料分散到幾百甚至數千、數萬臺機器上。
2)為了將這個資料匯聚到正確的節點,需要將這些資料放入正確的Partition,因為資料大小已經大於節點的記憶體,因此這個過程中可能會發生多次硬碟續寫。
3)為了節省頻寬,這個資料可能需要壓縮,如何在壓縮率和壓縮解壓時間中間做一個比較好的選擇?
4)資料需要通過網路傳輸,因此資料的序列化和反序列化也變得相對複雜。
一般來說,每個Task處理的資料可以完全載入記憶體(如果不能,可以減小每個Partition的大小),因此Task可以做到在記憶體中計算。但是對於Shuffle來說,如果不持久化這個中間結果,一旦資料丟失,就需要重新計算依賴的全部RDD,因此有必要持久化這個中間結果。所以這就是為什麼Shuffle過程會產生檔案的原因。
如果Shuffle過程不落地,①可能會造成記憶體溢位 ②當某分割槽丟失時,會重新計算所有父分割槽資料
Shuffle Write
Shuffle Write,即資料是如何持久化到檔案中,以使得下游的Task可以獲取到其需要處理的資料的(即Shuffle Read)。在Spark 0.8之前,Shuffle Write是持久化到快取的,但後來發現實際應用中,shuffle過程帶來的資料通常是巨量的,所以經常會發生記憶體溢位的情況,所以在Spark 0.8以後,Shuffle Write會將資料持久化到硬碟,再之後Shuffle Write不斷進行演進優化,但是資料落地到本地檔案系統的實現並沒有改變。
1)Hash Based Shuffle Write
在Spark 1.0以前,Spark只支援Hash Based Shuffle。因為在很多運算場景中並不需要排序,因此多餘的排序只能使效能變差,比如Hadoop的Map Reduce就是這麼實現的,也就是Reducer拿到的資料都是已經排好序的。實際上Spark的實現很簡單:每個Shuffle Map Task根據key的雜湊值,計算出每個key需要寫入的Partition然後將資料單獨寫入一個檔案,這個Partition實際上就對應了下游的一個Shuffle Map Task或者Result Task。因此下游的Task在計算時會通過網路(如果該Task與上游的Shuffle Map Task執行在同一個節點上,那麼此時就是一個本地的硬碟讀寫)讀取這個檔案並進行計算。
Hash Based Shuffle Write存在的問題
1)每個節點可能會同時開啟多個檔案,每次開啟檔案都會佔用一定記憶體。假設每個Write Handler的預設需要100KB的記憶體,那麼同時開啟這些檔案需要50GB的記憶體,對於一個叢集來說,還是有一定的壓力的。尤其是如果Shuffle Map Task和下游的Task同時增大10倍,那麼整體的記憶體就增長到5TB。
2)從整體的角度來看,開啟多個檔案對於系統來說意味著隨機讀,尤其是每個檔案比較小但是數量非常多的情況。而現在機械硬碟在隨機讀方面的效能特別差,非常容易成為效能的瓶頸。如果叢集依賴的是固態硬碟,也許情況會改善很多,但是隨機寫的效能肯定不如順序寫的。
Hash Based Shuffle的每個Mapper都需要為每個Reducer寫一個檔案,供Reducer讀取,即需要產生M*R個數量的檔案,如果Mapper和Reducer的數量比較大,產生的檔案數會非常多。
2)Sort Based Shuffle Write
Spark Core的一個重要的升級就是將預設的Hash Based Shuffle換成了Sort Based Shuffle,即spark.shuffle.manager從Hash換成了Sort
對應的實現類分別是
org.apache.spark.shuffle.hash.HashShuffleManager
org.apache.spark.shuffle.sort.SortShuffleManager。
Sort Based Shuffle的模式是:每個Shuffle Map Task不會為每個Reducer生成一個單獨的檔案;相反,它會將所有的結果寫到一個檔案裡,同時會生成一個Index檔案,
Reducer可以通過這個Index檔案取得它需要處理的資料。避免產生大量檔案的直接收益就是節省了記憶體的使用和順序Disk IO帶來的低延時。節省記憶體的使用可以減少GC的風險和頻率。而減少檔案的數量可以避免同時寫多個檔案給系統帶來的壓力。
Sort Based Write實現詳解
Shuffle Map Task會按照key相對應的Partition ID進行Sort,其中屬於同一個Partition的key不會Sort。因為對於不需要Sort的操作來說,這個Sort是負收益的;要知道之前Spark剛開始使用Hash Based的Shuffle而不是Sort Based就是為了避免Hadoop Map Reduce對於所有計算都會Sort的效能損耗。對於那些需要Sort的運算,
比如sortByKey,這個Sort在Spark 1.2.0裡還是由Reducer完成的。
①答出shuffle的定義
②spark shuffle的特點
③spark shuffle的目的
④spark shuffel的實現類,即對應優缺點
Shuffle 相關引數配置
Shuffle是Spark Core比較複雜的模組,它也是非常影響效能的操作之一。
1)spark.shuffle.manager
兩種方式的Shuffle 即Hash Based Shuffle和Sort Based Shuffle
2)spark.shuffle.spill
這個引數的預設值是true,用於指定Shuffle過程中如果記憶體中的資料超過閾值(參考spark.shuffle.memoryFraction的設定)時是否需要將部分資料臨時寫入外部儲存。
如果設定為false,那麼這個過程就會一直使用記憶體,會有記憶體溢位的風險。因此只有在確定記憶體足夠使用時,才可以將這個選項設定為false。
3)spark.shuffle.memoryFraction
在啟用spark.shuffle.spill的情況下,spark.shuffle.memoryFraction決定了當Shuffle過程中使用的記憶體達到總記憶體多少比例的時候開始spill。在Spark 1.2.0裡,這個值是0.2
此引數可以適當調大,可以控制在0.4~0.6。
通過這個引數可以設定Shuffle過程佔用記憶體的大小,它直接影響了寫入到外部儲存的頻率和垃圾回收的頻率。
可以適當調大此值,可以減少磁碟I/O次數。
4)spark.shuffle.blockTransferService
在Spark 1.2.0中這個配置的預設值是netty,而在之前的版本中是nio。它主要是用於在各個Executor之間傳輸Shuffle資料。netty的實現更加簡潔,但實際上使用者不用太關心這個選項。除非有特殊需求,否則採用預設配置即可。
5)spark.shuffle.consolidateFiles
這個配置的預設值是false。主要是為了解決在Hash Based Shuffle過程中產生過多檔案的問題。如果配置選項為true,那麼對於同一個Core上執行的Shuffle Map Task不會產生一個新的Shuffle檔案而是重用原來的
6)spark.shuffle.compress和spark.shuffle.spill.compress
這兩個引數的預設配置都是true。都是用來設定Shuffle過程中是否對Shuffle資料進行壓縮
前者針對最終寫入本地檔案系統的輸出檔案
後者針對在處理過程需要寫入到外部儲存的中間資料,即針對最終的shuffle輸出檔案。
7)spark.reducer.maxMbInFlight
這個引數用於限制一個Result Task向其他的Executor請求Shuffle資料時所佔用的最大記憶體數,預設是64MB。尤其是如果網絡卡是千兆和千兆以下的網絡卡時。預設值是 設定這個值需要綜合考慮網絡卡頻寬和記憶體。
Spark調優
更好的序列化實現
Spark用到序列化的地方
1)Shuffle時需要將物件寫入到外部的臨時檔案。
2)每個Partition中的資料要傳送到worker上,spark先把RDD包裝成task物件,將task通過網路發給worker。
3)RDD如果支援記憶體+硬碟,只要往硬碟中寫資料也會涉及序列化。
預設使用的是java的序列化。但java的序列化有兩個問題,一個是效能相對比較低,另外它序列化完二進位制的內容長度也比較大,造成網路傳輸時間拉長。業界現在有很多更好的實現,如kryo,比java的序列化快10倍以上。而且生成內容長度也短。時間快,空間小,自然選擇它了。
通過程式碼使用Kryo
配置多臨時檔案目錄
spark.local.dir引數。當shuffle、歸併排序(sort、merge)時都會產生臨時檔案。這些臨時檔案都在這個指定的目錄下。那這個資料夾有很多臨時檔案,如果都發生讀寫操作,有的執行緒在讀這個檔案,有的執行緒在往這個檔案裡寫,磁碟I/O效能就非常低。
可以建立多個資料夾,每個資料夾都對應一個真實的硬碟。假如原來是3個程式同時讀寫一個硬碟,效率肯定低,現在讓三個程式分別讀取3個磁碟,這樣衝突減少,效率就提高了。這樣就有效提高外部檔案讀和寫的效率。怎麼配置呢?只需要在這個配置時配置多個路徑就可以。中間用逗號分隔。
spark.local.dir=/home/tmp,/home/tmp2
啟用推測執行機制
可以設定spark.speculation true
開啟後,spark會檢測執行較慢的Task,並複製這個Task在其他節點執行,最後哪個節點先執行完,就用其結果,然後將慢Task 殺死
collect速度慢
collect只適合在測試時,因為把結果都收集到Driver伺服器上,資料要跨網路傳輸,同時要求Driver伺服器記憶體大,所以收集過程慢。解決辦法就是直接輸出到分散式檔案系統中。
有些情況下,RDD操作使用MapPartitions替代map
map方法對RDD的每一條記錄逐一操作。mapPartitions是對RDD裡的每個分割槽操作
rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}
這樣頻繁的連結、斷開資料庫,效率差。
rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}
這樣就一次連結一次斷開,中間批量操作,效率提升。
Spark的GC調優
由於Spark立足於記憶體計算,常常需要在記憶體中存放大量資料,因此也更依賴JVM的垃圾回收機制(GC)。並且同時,它也支援相容批處理和流式處理,對於程式吞吐量和延遲都有較高要求,因此GC引數的調優在Spark應用實踐中顯得尤為重要。
主要有兩種策略——Parallel GC(吞吐量優先)和CMS GC(低延遲響應)。
GC演算法原理
對於記憶體較大的環境非常友好。因為G1 GC對於記憶體的使用率特別高,記憶體越大,此優勢越明顯。
選擇垃圾收集器
park預設使用的是Parallel GC。經調研我們發現,Parallel GC常常受困於Full GC,而每次Full GC都給效能帶來了較大的下降。而Parallel GC可以進行引數調優的空間也非常有限,我們只能通過調節一些基本引數來提高效能,如各年代分割槽大小比例、進入老年代前的拷貝次數等。而且這些調優策略只能推遲Full GC的到來,如果是長期執行的應用,Parallel GC調優的意義就非常有限了。
將InitiatingHeapOccupancyPercent引數調低(預設值是45),可以使G1 GC收集器更早開始Mixed GC(Minor GC);但另一方面,會增加GC發生頻率。(啟動併發GC週期時的堆記憶體佔用百分比. G1之類的垃圾收集器用它來觸發併發GC週期,基於整個堆的使用率,而不只是某一代記憶體的使用比. 值為 0 則表示"一直執行GC迴圈". 預設值為 45.)降低此值,會提高Minor GC的頻率,但是會推遲Full GC的到來。
提高ConcGCThreads的值,在Mixed GC階段投入更多的併發執行緒,爭取提高每次暫停的效率。但是此引數會佔用一定的有效工作執行緒資源。
除錯這兩個引數可以有效降低Full GC出現的概率。Full GC被消除之後,最終的效能獲得了大幅提升。
Spark的記憶體管理
Spark的核心概念是RDD,實際執行中記憶體消耗都與RDD密切相關。Spark允許使用者將應用中重複使用的RDD資料持久化快取起來,從而避免反覆計算的開銷,而RDD的持久化形態之一就是將全部或者部分資料快取在JVM的Heap中。當我們觀察到GC延遲影響效率時,應當先檢查Spark應用本身是否有效利用有限的記憶體空間。RDD佔用的記憶體空間比較少的話,程式執行的heap空間也會比較寬鬆,GC效率也會相應提高;而RDD如果佔用大量空間的話,則會帶來巨大的效能損失
總結
對於大量依賴於記憶體計算的Spark應用,GC調優顯得尤為重要。在發現GC問題的時候,不要著急除錯GC。而是先考慮是否存在Spark程序記憶體管理的效率問題,例如RDD快取的持久化和釋放。至於GC引數的除錯,首先我們比較推薦使用G1 GC來執行Spark應用。相較於傳統的垃圾收集器,隨著G1的不斷成熟,需要配置的選項會更少,能同時滿足高吞吐量和低延遲的尋求。當然,GC的調優不是絕對的,不同的應用會有不同應用的特性,掌握根據GC日誌進行調優的方法,才能以不變應萬變。最後,也不能忘了先對程式本身的邏輯和程式碼編寫進行考量,例如減少中間變數的建立或者複製,控制大物件的建立,將長期存活物件放在Off-heap中等等。
Checkpoint機制
checkpoint的意思就是建立檢查點,類似於快照,例如在spark計算裡面 計算流程DAG特別長,伺服器需要將整個DAG計算完成得出結果,但是如果在這很長的計算流程中突然中間算出的資料丟失了,spark又會根據RDD的依賴關係從頭到尾計算一遍,這樣子就很費效能,當然我們可以將中間的計算結果通過cache或者persist放到記憶體或者磁碟中,但是這樣也不能保證資料完全不會丟失,儲存的這個記憶體出問題了或者磁碟壞了,也會導致spark從頭再根據RDD計算一遍,所以就有了checkpoint,其中checkpoint的作用就是將DAG中比較重要的中間資料做一個檢查點將結果儲存到一個高可用的地方
總結:Spark的CheckPoint機制很重要,也很常用,尤其在機器學習中的一些迭代演算法中很常見。比如一個演算法迭代10000次,如果不適用緩衝機制,如果某分割槽資料丟失,會導致整個計算鏈重新計算,所以引入快取機制。但是光引入快取,也不完全可靠,比如快取丟失或快取儲存不下,也會導致重新計算,所以使用CheckPoint機制再做一層保證。
補充:檢查目錄的路徑,一般都是設定到HDFS上
Spark懶執行的意義
Spark中,Transformation方法都是懶操作方法,比如map,flatMap,reduceByKey等。當觸發某個Action操作時才真正執行。
懶操作的意義:
①不執行job就觸發計算,避免了大量的無意義的計算,即避免了大量的無意義的中間結果的產生,即避免產生無意義的磁碟I/O及網路傳輸
②更深層次的意義在於,執行運算時,看到之前的計算操作越多,執行優化的可能性就越高
Spark共享變數
Spark程式的大部分操作都是RDD操作,通過傳入函式給RDD操作函式來計算。這些函式在不同的節點上併發執行,但每個內部的變數有不同的作用域,不能相互訪問,所以有時會不太方便,Spark提供了兩類共享變數供程式設計使用——廣播變數和計數器
1. 廣播變數
這是一個只讀物件,在所有節點上都有一份快取,建立方法是SparkContext.broadcast()
注意,廣播變數是隻讀的,所以建立之後再更新它的值是沒有意義的,一般用val修飾符來定義廣播變數。
2. 計數器
計數器只能增加,是共享變數,用於計數或求和。
計數器變數的建立方法是SparkContext.accumulator(v, name),其中v是初始值,name是名稱。
spark解決資料傾斜問題
將少量的資料轉化為Map進行廣播,廣播會將此 Map 傳送到每個節點中,如果不進行廣播,每個task執行時都會去獲取該Map資料,造成了效能浪費。