1. 程式人生 > >Spark調優秘訣——超詳細

Spark調優秘訣——超詳細

多層 嵌套 取數 java版 sta 協調 一句話 string對象 就會

版權聲明:本文為博主原創文章,轉載請註明出處。

Spark調優秘訣

1.診斷內存的消耗

在Spark應用程序中,內存都消耗在哪了?

  • 1.每個Java對象都有一個包含該對象元數據的對象頭,其大小是16個Byte。由於在寫代碼時候,可能會出現這種情況:對象頭比對象本身占有的字節數更多,比如對象只有一個int的域。一般這樣設計是不合理的,造成了對象的“浪費”,在實際開發中應當避免這種情況。

  • 2.Java的String對象,會比它內部的原始數據要多出40個字節。因為它內部使用char數組來保存內部的字符序列的,並且還得保存諸如數組長度之類的信息。而且String使用的是UTF-16編碼,每個字符會占用2個字節。比如,包含10個字符的String,會占用60個字節。不過該使用String的時候就應該使用,這是無法避免的事,相對於後面說的序列化庫、持久化、垃圾回收、提高並行度、廣播共享數據、更有Shuffle階段的優化等方面,String對象的內存特性就是毛毛雨了。

  • 3.Java中的集合類型,比如HashMap和LinkedList,內部使用的是鏈式數據結構。既然是鏈表,那麽每個節點都有額外的信息來保證前後節點的查尋,它們都使用了Entry對象來包裝。Entry對象不僅僅有對象頭,還有指向下一個Entry的指針,通常占用8個字節。

  • 4.元素類型為原始數據類型(比如int)的集合,內部通常會使用原始數據類型的包裝類型,比如Integer,來存儲元素。這種情況其實和第三種情況一致的,都是因為Java的自動裝箱和拆箱而導致的。

以上就是Spark應用程序針對開發語言的特性所占用的內存大小,要通過什麽辦法來查看和確定消耗內存大小呢?

  • 1、自行設置RDD的並行度。有兩種方式:第一,在parallelize()、textFile()等外部數據源方法中傳入第二個參數,設置RDD的task / partition的數量;第二,用SparkConf.set()方法,設置參數(spark.default.parallelism),統一設置整個Spark Application所有RDD的partition數量。

  • 2、調用RDD.cache()將RDD cache到內存中。方便直接從log信息中看出每個partition消耗的內存。

  • 3、找出Driver進程打印的log信息,會有類似於:“INFO BlockManagerMasterActor: Added rdd01 in memory on mbk.local:50311 (size: 717.5 KB, free: 555.5 MB)”的日誌信息。這就顯示了每個partition占用了多少內存。

  • 4、將這個內存信息乘以partition數量,即可得出RDD的內存占用量。

註意,這種方法,一定要確保電腦的內存能夠承受測試的數據,不然會報出oom異常。

通過以上的簡介,大概知道了內存的消耗和如何查看消耗的內存了。但是只知道內存的消耗而不去優化它,肯定是不行的,在生產環境中,每一分每一秒都是金錢和客戶的滿意。比如這個報表要求每天早上8點跑出結果給領導看的,然而因為你的Spark程序實在太慢了,11點才出結果,那麽領導顯然會不滿意的,最後獎金就變少了。因此下面來根據多個方面來逐點分析如何對Spark應用程序調優,分析的順序是從表面到底層的Shuffle階段。其實最重要的調優還是Shuffle階段的調優。

2.高性能序列化類庫

在分布式應用程序中,要想程序能夠工作,首先第一步是什麽?毫無疑問是分布式節點之間的通信,要想通信,最重要的階段是序列化和反序列化。那麽,顯而易見,速度更快,更穩定的序列化庫影響分布式系統的通信效率。

在Spark中,默認是使用Java自帶的序列化機制——基於ObjectInputStream和ObjectOutputStream的序列化機制,這是為了提高便捷性和適用性,畢竟是Java原生的嘛。然鵝,自帶的東西往往考慮的東西比較多,沒法做到樣樣俱全,比如內序列化後占據的內存還是較大,但是Spark是基於內存的大數據框架,對內存的要求很高。所以,在Spark應用程序中,Java自帶的序列化庫的效率有點差強人意。需求是從實際出發的嘛,最終Spark也提供了另外一種序列化機制——Kryo序列化機制。

Kryo序列化機制比Java序列化機制更快,序列化後的數據占的內存更小。那麽Kryo序列化機制這麽好,為什麽不選用它是默認序列化庫呢?這裏提一句話“人無完人,誰能無錯”,Kryo序列化機制也樣,之所以不選用它為默認序列化機制是因為有些類型雖然實現了Seriralizable接口,但是不一定能夠進行序列化;此外,如果要得到最佳的性能,需要在Spark應用程序中,對所有 需要序列化的類型都進行註冊。

使用Kryo序列化機制的方法: 1.給SparkConf加入一個參數 SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

2.對需要序列化的類自行進行註冊(因為如果不註冊,Kryo必須一直保存類型的全限定名,會占用內存。Spark默認是對Scala中常用的類型自動註冊了Kryo的,都在AllScalaRegistry類中) Scala版本:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[Counter] ))
val sc = new SparkContext(conf)

Java版本:

SparkConf conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)

還可以對Kryo序列化機制進行優化達到更優的效果。

  • 1、優化緩存大小。如果註冊的要序列化的自定義的類型,本身很大大,比如包含了超過100個field。會導致要序列化的對象過大。此時需要對Kryo本身進行優化。因為Kryo內部的緩存可能不夠存放這麽大的class對象。此時需要調用SparkConf.set()方法,設置spark.kryoserializer.buffer.mb參數的值,將其調大以適用。默認情況下spark.kryoserializer.buffer.mb是2,即最大能緩存2M的對象,然後進行序列化。可以在必要時將其調大。比如設置為10。

  • 2、預先註冊自定義類型。雖然不註冊自定義類型,Kryo類庫也能正常工作,但是那樣對於它要序列化的每個對象,都會保存一份它的全限定類名。反而會耗費大量內存。因此通常都預先註冊好要序列化的自定義的類。

總結,需要用到Kryo序列化機制的場景,算子內部使用了外部的大對象或者大數據結構。那麽可以切換到Kryo序列化,序列化速度更快,和獲得更小的序列化數據,減少內存的消耗。

3.優化數據結構

對數據結構的優化,主要是針對Java數據結構(如果用scala開發的話,其實原理也一樣的)。其實就是算子裏面的局部變量或者算子函數外部的數據結構。比如基於鏈式結構的數據結構、包裝類型的數據結構等,它們在除了本身的數據之外,還會有額外的數據信息來維持它們的數據類型,這樣就會比預想占有更大的內存。

以下是一些優化建議:

  • 1、能使用數組或字符串就不要用集合類。即優先使用Array,退而求次才是ArrayList、LinkedList、HashMap、HashTable等。熟悉Java語言的都知道集合類一般是泛型的,然鵝泛型的類型是包裝類,比如List list = new ArrayList(),就會因為包裝類而占有額外的內存,最後占有更多的額外開銷。在生產開發中的做法是,對於HashMap、List這種數據,統一用String拼接成特殊格式的字符串。如Map<Integer, Person> persons = new HashMap<Integer, Person>()。可以優化為,特殊的字符串格式:id:name,address|id:name,address...

  • 2、避免使用多層嵌套的對象結構。public class Teacher { private List students = new ArrayList() }。就是非常不好的例子。因為Teacher類的內部又嵌套了大量的小Student對象。比如說,對於上述例子,也完全可以使用特殊的字符串來進行數據的存儲。比如,用json字符串來存儲數據,就是一個很好的選擇。{"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]}

  • 3、能用int就不用String。雖然String比集合咧更高效,但是之前說過Java的String是占2個字節的,使用int會優化內存。

總結,在寫Spark程序的時候,要牢牢記住,盡量壓榨因語言帶來的內存開銷,達到節約內存的目的。

4.對多次使用的RDD進行持久化或Checkpoint

  • 1、對一個RDD,基於它進行了多次transformation或者action操作。非常有必要對其進行持久化操作,以避免對一個RDD反復進行計算。

  • 2、如果要保證在RDD的持久化數據可能丟失的情況下,還要保證高性能,那麽可以對RDD進行Checkpoint操作。

如下圖,兩次對同一個RDD操作,但是比如當路徑1先計算完RDD2得出RDD3,當RDD5再次計算RDD2的時候,由於在Spark中,對RDD計算後,如果沒有持久化,在計算後可能就會立刻拋棄掉數據。所以第二次計算RDD2時需要重新計算RDD2前面的RDD。這樣很明顯就消耗了額外的時間。

技術分享圖片

總結,對於後面要多次可能用到的RDD,要對其持久化,如果要高可用,更要對其checkpoint,保證以後出錯節省大量的時間。正所謂“長痛不如短痛”,一時的付出是為了後面的快速恢復錯誤和高可用。

5.使用序列化的持久化級別

  • RDD的數據是持久化到內存,或者磁盤中的。但是如果機器的內存大小不是很充足,或者有時為了節省機器的內存開銷,比如在生產環境下,機器不單單是跑這麽一個Spark應用的,還需要留些內存供其他應用使用。這種情況下,可以使用序列化的持久化級別。比如MEMORYONLYSER、MEMORYANDDISKSER等。用法是:RDD.persist(StorageLevel.MEMORYONLY_SER)。

  • 將數據序列化之後,再持久化,可以大大減小對內存的消耗。此外,數據量小了之後,如果要寫入磁盤,磁盤io性能消耗也比較小。

  • 對RDD持久化序列化後,RDD的每個partition的數據,都是序列化為一個巨大的字節數組。這樣,對於內存的消耗就小了。但是唯一的缺點是獲取RDD數據時,需要對其進行反序列化,會增大其性能開銷。這種情況下可以使用第二點的Kryo序列化機制配合,提高序列化的效率。

級別 使用空間 使用空間 是否在內存中 是否在磁盤上 備註
MEMORY_ONLY
MEMORY_ONLY_2 數據存2份
MEMORY_ONLY_SER 數據序列化
MEMORY_ONLY_SER_2 數據序列化,數據存2份
MEMORY_AND_DISK 中等 部分 部分 如果數據在內存中放不下,則溢寫到磁盤
MEMORY_AND_DISK_2 中等 部分 部分 數據存2份
MEMORY_AND_DISK_SER 部分 部分
MEMORY_AND_DISK_SER_2 部分 部分 數據存2份
DISK_ONLY
DISK_ONLY_2 數據存2份

6.Java虛擬機垃圾回收調優

7.提高並行度

  • 在實際使用Spark集群的時候,很多時候對於集群的資源並不是一定會被充分利用到,這是由於task和cpu核的協調不好導致的。要想合理的“榨幹”集群的資源和性能,可以合理的設置Spark應用程序運行的並行度,來充分地利用集群的資源,這樣才能充分的提高Spark應用程序的性能。

  • Spark的數據源有兩種,一種是外部的,比如HDFS等分布式文件系統,或者通過現有的數組等數據結構序列化而成;一種是通過已有的RDD轉換而來的。這裏以Spark讀取HDFS的數據為例子。Spark會根據讀取HDFS的時候把每個block劃分為一個Partition,其實也是按照這個來自動設置並行度的。對於reduceByKey等會發生shuffle的算子操作,會使用並行度最大的父RDD的並行度作為Spark應用的並行度。

  • 通過上面的分析,我們可以手動設置並行度,在讀取HDFS或者並行化數據的時候調用textFile()和parallelize()等方法的時候,通過第二個參數來設置並行度。也可以使用spark.default.parallelism參數,來設置統一的並行度。根據Spark官方的推薦,最優的方案是給集群中的每個cpu core設置2~3個task,也就是task的數量是cpu核的2~3倍。

  • 以下是實現例子:現在已知cpu core有10個。比如spark-submit設置了executor數量是2個,每個executor有5個core。但是在Spark應用程序中這樣設置了SparkConf().set("spark.default.parallelism", "5"),那麽application總共會有5個core。實際上所有的RDD都被設為了partition為5,也就是每個RDD的數據分為5份,也就是5份數據(partition)成為5個task分配到這兩個executor中。很明顯,Spark應用程序在運行的時候,只占用了5個cpu core,還剩下5個cpu core是沒用到的,浪費了集群資源。此時可以設置這樣來優化Spark的集群性能,通過設置參數 SparkConf().set("spark.default.parallelism", "30")來設置合理的並行度,從而充分利用資源。為什麽呢?請看下圖:

技術分享圖片

8.廣播共享數據

  • RDD實質是彈性分布式數據集,在每個節點中的每個task(一個節點可以有很多個task)操作的只是RDD的一部分數據,如果RDD算子操作使用到了算子函數外部的一份大數據的時候,實際上是Spark應用程序把數據文件通過driver發送給每一個節點的每一個task,很明顯,這樣會造成大量的網絡IO操作,大量消耗節點上的內存。其實很容易想到,把一份大數據文件發送給每個節點就OK了,單個節點的所有task共享一份數據,這樣就會節省大量的網絡IO操作和節省大量內存消耗。

  • 如果算子函數中,使用到了特別大的數據(比如一份大的配置文件)供每個節點的所有task使用,可以借助Spark提供的共享變量。共享變量有兩種,一是廣播變量,一是累加器。廣播變量是只讀的,通常用來提供一份數據給所有的節點,每個節點的task訪問訪問同一份數據。而累加器是可寫可讀的,一個累加器一般是用於所有節點對用一個簡單的整型變量進行共享累加,共同維護一份數據。這樣的話,就不至於將一個大數據拷貝到每一個task上去。而是給每個節點拷貝一份,然後節點上的task共享該數據。原理如圖所示: 技術分享圖片

9.數據本地化

Spark數據本地化的基本原理

  • Spark和MapReduce是如今兩個最流行的大數據框架,它們的原理都是計算移動,而數據不移動,計算找數據。這樣做的創新性是避免了大量數據的網絡傳輸造成網絡IO和內存的消耗。因此引出一個叫“數據本地化”的概念。

  • 數據本地化對於Spark Job性能有著巨大的影響。如果數據以及要計算它的代碼是在同一個節點,性能會非常高。但是,如果數據和計算它的代碼是位於不同的節點,那麽其中之一必須到另外一方的機器上。通常來說,移動代碼到其他節點,會比移動數據到代碼所在的節點上去,速度要快得多,因為代碼比較小。Spark也正是基於這個數據本地化的原則來構建task調度算法的。

  • 數據本地化,指的是,數據離計算它的代碼有多近。基於數據距離代碼的距離,有幾種數據本地化級別:

    • 1、PROCESS_LOCAL:數據和計算它的代碼在同一個JVM進程中。
    • 2、NODE_LOCAL:數據和計算它的代碼在一個節點上,但是不在一個進程中,比如在不同的executor進程中,或者是數據在HDFS文件的block中。
    • 3、NO_PREF:數據從哪裏過來,性能都是一樣的。
    • 4、RACK_LOCAL:數據和計算它的代碼在一個機架上。
    • 5、ANY:數據可能在任意地方,比如其他網絡環境內,或者其他機架上。

Spark數據本地化的特點

  • Spark傾向於使用最好的本地化級別來調度task,但並不是每次都會使用最好的本地化數據的。在實際中,如果沒有任何未處理的數據在空閑的executor上,Spark會放低本地化級別。這時有兩個選擇:第一,driver等待,直到executor上的cpu釋放出來,就分配task等資源給這個executor;第二,立即在任意一個executor上啟動一個task。

  • Spark會默認等待一段時間(這個事件可以通過參數來設置),來期望在task要處理的數據所在的節點上的executor空閑出一個cpu,從而為其分配task鞥資源。但只要超過了時間,Spark就會將task分配到其他任意一個空閑的executor上。

  • 可以設置參數,spark.locality系列參數,來調節Spark等待task可以進行數據本地化的時間。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack

技術分享圖片

  • 針對以上的分析,我們可以這樣調優,增大查找本地化數據的超時時間和重試次數,因為時間更長更利於查找本地化數據的節點的executor,重試次數越多,更多機會嘗試查找本地化數據的節點的executor。

  • 調優方式,主要是spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack這些參數,具體的根據實際的業務需求來控制參數就OK了。

10.reduceByKey和groupByKey的選擇

  • 以下兩種方式是等價的,但是實現的原理卻不相同。reduceByKey,因為它會在map端,先進行本地combine,可以大大減少要傳輸到reduce端的數據量,減小網絡傳輸的開銷。而groupByKey算子卻不會這樣優化。所以只有在reduceByKey處理不了時,才用groupByKey().map()來替代。

    val counts = pairs.reduceByKey(_ + _)
    val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))
    

11.shuffle性能優化

  • 無論是MapReduce還是Spark,Shuffle階段是最重要的階段,它的好壞影響著整個Spark的性能。其實Shuffle階段的調優,可以從以下的參數入手:
    • new SparkConf().set("spark.shuffle.consolidateFiles", "true")
    • spark.shuffle.consolidateFiles:是否開啟shuffle block file的合並,默認為false
    • spark.reducer.maxSizeInFlight:reduce task的拉取緩存,默認48m
    • spark.shuffle.file.buffer:map task的寫磁盤緩存,默認32k
    • spark.shuffle.io.maxRetries:拉取失敗的最大重試次數,默認3次
    • spark.shuffle.io.retryWait:拉取失敗的重試間隔,默認5s
    • spark.shuffle.memoryFraction:用於reduce端聚合的內存比例,默認0.2,超過比例就會溢出到磁盤上

原理請先看下圖,然後再做分析。 技術分享圖片

這個是沒有開啟consolidateFiles優化(Spark1.3之後加入的),會產生大量的磁盤文件,在寫磁盤和result task拉取數據的時候,會浪費過多的系統資源。

開啟consolidateFiles優化 技術分享圖片

優化方法:

  • 開啟consolidateFiles,增大result task的拉取緩存,增大shufflemaptask的寫磁盤緩存,增大重試次數和重試間隔,調大用於reduce端聚合的內存比例
  • new SparkConf().set("spark.shuffle.consolidateFiles", "true")
  • spark.shuffle.consolidateFiles:是否開啟shuffle block file的合並,默認為false
  • spark.reducer.maxSizeInFlight:ResultTask的拉取緩存,默認48m
  • spark.shuffle.file.buffer:map task的寫磁盤緩存,默認32k
  • spark.shuffle.io.maxRetries:拉取失敗的最大重試次數,默認3次
  • spark.shuffle.io.retryWait:拉取失敗的重試間隔,默認5s
  • spark.shuffle.memoryFraction:用於reduce端聚合的內存比例,默認0.2,超過比例就會溢出到磁盤上

  • 影響一個Spark作業性能的因素,主要還是代碼開發、資源參數以及數據傾斜,shuffle調優只能在整個Spark的性能調優中占到一小部分。

  • shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖(這個緩存大小可以通過上面的參數來設定)相同大小的數據,然後通過內存中的一個Map進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到buffer緩沖中進行聚合操作。一直循環,直到最後將所有數據到拉取完,並得到最終的結果。

  • 開啟consolidate機制之後,在shuffle write過程中,task就不是為下遊stage的每個task創建一個磁盤文件了。此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下遊stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會創建一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。

  • 當Executor的CPU core執行完一批task,接著執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合並,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。

Spark的調優至此告一段落,下一篇會針對實際過程中遇到的問題特定的調優。

Spark調優秘訣——超詳細