1. 程式人生 > >Spark日誌分析項目Demo(9)--常規性能調優

Spark日誌分析項目Demo(9)--常規性能調優

array ack 不一定 集合類型 -s 如果 一次 puts cluster

一 分配更多資源

分配更多資源:性能調優的王道,就是增加和分配更多的資源,性能和速度上的提升,是顯而易見的;基本上,在一定範圍之內,增加資源與性能的提升,是成正比的;寫完了一個復雜的spark作業之後,進行性能調優的時候,首先第一步,我覺得,就是要來調節最優的資源配置;在這個基礎之上,如果說你的spark作業,能夠分配的資源達到了你的能力範圍的頂端之後,無法再分配更多的資源了,公司資源有限;那麽才是考慮去做後面的這些性能調優的點。

問題:
1、分配哪些資源?
2、在哪裏分配這些資源?
3、為什麽多分配了這些資源以後,性能會得到提升?

答案:
1、分配哪些資源?executor、cpu per executor、memory per executor、driver memory
2、在哪裏分配這些資源?在我們在生產環境中,提交spark作業時,用的spark-submit shell腳本,裏面調整對應的參數
/usr/local/spark/bin/spark-submit \
–class cn.spark.sparktest.core.WordCountCluster \
–num-executors 3 \ 配置executor的數量
–driver-memory 100m \ 配置driver的內存(影響不大)
–executor-memory 100m \ 配置每個executor的內存大小
–executor-cores 3 \ 配置每個executor的cpu core數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

3、調節到多大,算是最大呢?
第一種,Spark Standalone,公司集群上,搭建了一套Spark集群,你心裏應該清楚每臺機器還能夠給你使用的,大概有多少內存,多少cpu core;那麽,設置的時候,就根據這個實際的情況,去調節每個spark作業的資源分配。比如說你的每臺機器能夠給你使用4G內存,2個cpu core;20臺機器;executor,20;4G內存,2個cpu core,平均每個executor。
第二種,Yarn。資源隊列。資源調度。應該去查看,你的spark作業,要提交到的資源隊列,大概有多少資源?500G內存,100個cpu core;executor,50;10G內存,2個cpu core,平均每個executor。
一個原則,你能使用的資源有多大,就盡量去調節到最大的大小(executor的數量,幾十個到上百個不等;executor內存;executor cpu core)

4、為什麽調節了資源以後,性能可以提升?
(1)增加executor:
如果executor數量比較少,那麽,能夠並行執行的task數量就比較少,就意味著,我們的Application的並行執行的能力就很弱。
比如有3個executor,每個executor有2個cpu core,那麽同時能夠並行執行的task,就是6個。6個執行完以後,再換下一批6個task。
增加了executor數量以後,那麽,就意味著,能夠並行執行的task數量,也就變多了。比如原先是6個,現在可能可以並行執行10個,甚至20個,100個。那麽並行能力就比之前提升了數倍,數十倍。
相應的,性能(執行的速度),也能提升數倍~數十倍。
(2)增加每個executor的cpu core,也是增加了執行的並行能力。原本20個executor,每個才2個cpu core。能夠並行執行的task數量,就是40個task。
現在每個executor的cpu core,增加到了5個。能夠並行執行的task數量,就是100個task。
執行的速度,提升了2.5倍。
(3)增加每個executor的內存量。增加了內存量以後,對性能的提升,有3點:
- 如果需要對RDD進行cache,那麽更多的內存,就可以緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減少了磁盤IO。
- 對於shuffle操作,reduce端,會需要內存來存放拉取的數據並進行聚合。如果內存不夠,也會寫入磁盤。如果給executor分配更多內存以後,就有更少的數據,需要寫入磁盤,甚至不需要寫入磁盤。減少了磁盤IO,提升了性能。
- 對於task的執行,可能會創建很多對象。如果內存比較小,可能會頻繁導致JVM堆內存滿了,然後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內存加大以後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。

二 調節並行度

並行度:其實就是指的是,Spark作業中,各個stage的task數量,也就代表了Spark作業的在各個階段(stage)的並行度。

如果不調節並行度,導致並行度過低,會怎麽樣?

假設,現在已經在spark-submit腳本裏面,給我們的spark作業分配了足夠多的資源,比如50個executor,每個executor有10G內存,每個executor有3個cpu core。基本已經達到了集群或者yarn隊列的資源上限。

task沒有設置,或者設置的很少,比如就設置了,100個task。50個executor,每個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數在150個cpu core,可以並行運行。但是你現在,只有100個task,平均分配一下,每個executor分配到2個task,ok,那麽同時在運行的task,只有100個,每個executor只會並行運行2個task。每個executor剩下的一個cpu core,就浪費掉了。

你的資源雖然分配足夠了,但是問題是,並行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。

合理的並行度的設置,應該是要設置的足夠大,大到可以完全合理的利用你的集群資源;比如上面的例子,總共集群有150個cpu core,可以並行運行150個task。那麽就應該將你的Application的並行度,至少設置成150,才能完全有效的利用你的集群資源,讓150個task,並行執行;而且task增加到150個以後,即可以同時並行運行,還可以讓每個task要處理的數據量變少;比如總共150G的數據要處理,如果是100個task,每個task計算1.5G的數據;現在增加到150個task,可以並行運行,而且每個task主要處理1G的數據就可以。

很簡單的道理,只要合理設置並行度,就可以完全充分利用你的集群計算資源,並且減少每個task要處理的數據量,最終,就是提升你的整個Spark作業的性能和運行速度。
1、task數量,至少設置成與Spark application的總cpu core數量相同(最理想情況,比如總共150個cpu core,分配了150個task,一起運行,差不多同一時間運行完畢)
2、官方是推薦,task數量,設置成spark application總cpu core數量的2~3倍,比如150個cpu core,基本要設置task數量為300~500;
實際情況,與理想情況不同的,有些task會運行的快一點,比如50s就完了,有些task,可能會慢一點,要1分半才運行完,所以如果你的task數量,剛好設置的跟cpu core數量相同,可能還是會導致資源的浪費,因為,比如150個task,10個先運行完了,剩余140個還在運行,但是這個時候,有10個cpu core就空閑出來了,就導致了浪費。那如果task數量設置成cpu core總數的2~3倍,那麽一個task運行完了以後,另一個task馬上可以補上來,就盡量讓cpu core不要空閑,同時也是盡量提升spark作業運行的效率和速度,提升性能。
3、如何設置一個Spark Application的並行度?
spark.default.parallelism
SparkConf conf = new SparkConf()
.set(“spark.default.parallelism”, “500”)

三 重構RDD架構以及RDD持久化

第一,RDD架構重構與優化

盡量去復用RDD,差不多的RDD,可以抽取稱為一個共同的RDD,供後面的RDD計算時,反復使用。

第二,公共RDD一定要實現持久化

北方吃餃子,現包現煮。你人來了,要點一盤餃子。餡料+餃子皮+水->包好的餃子,對包好的餃子去煮,煮開了以後,才有你需要的熟的,熱騰騰的餃子。

現實生活中,餃子現包現煮,當然是最好的了;但是Spark中,RDD要去“現包現煮”,那就是一場致命的災難。

對於要多次計算和使用的公共RDD,一定要進行持久化。

持久化,也就是說,將RDD的數據緩存到內存中/磁盤中,(BlockManager),以後無論對這個RDD做多少次計算,那麽都是直接取這個RDD的持久化的數據,比如從內存中或者磁盤中,直接提取一份數據。

第三,持久化,是可以進行序列化的

如果正常將數據持久化在內存中,那麽可能會導致內存的占用過大,這樣的話,也許,會導致OOM內存溢出。

當純內存無法支撐公共RDD數據完全存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減少內存的空間占用。

序列化的方式,唯一的缺點就是,在獲取數據的時候,需要反序列化。

如果序列化純內存方式,還是導致OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。

內存+磁盤,序列化

第四,為了數據的高可靠性,而且內存充足,可以使用雙副本機制,進行持久化

持久化的雙副本機制,持久化後的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;持久化的每個數據單元,存儲一份副本,放在其他節點上面;從而進行容錯;一個副本丟了,不用重新計算,還可以使用另外一份副本。
這種方式,僅僅針對你的內存資源極度充足。

四 廣播大變量

如果說,task使用大變量(1m~100m),明知道會導致性能出現惡劣的影響。那麽我們怎麽來解決呢?
廣播,Broadcast,將大變量廣播出去。而不是直接使用。
用戶訪問session分析模塊中的,按時間比例隨機抽取,這種隨機抽取的map,1M,舉例,還算小的。如果你是從哪個表裏面讀取了一些維度數據,比方說,所有商品品類的信息,在某個算子函數中要使用到。100M。1000個task。100G的數據,網絡傳輸。集群瞬間因為這個原因消耗掉100G的內存。
這種默認的,task執行的算子中,使用了外部的變量,每個task都會獲取一份變量的副本,有什麽缺點呢?在什麽情況下,會出現性能上的惡劣的影響呢?

map,本身是不小,存放數據的一個單位是Entry,還有可能會用鏈表的格式的來存放Entry鏈條。所以map是比較消耗內存的數據格式。

比如,map是1M。總共,你前面調優都調的特好,資源給的到位,配合著資源,並行度調節的絕對到位,1000個task。大量task的確都在並行運行。

這些task裏面都用到了占用1M內存的map,那麽首先,map會拷貝1000份副本,通過網絡傳輸到各個task中去,給task使用。總計有1G的數據,會通過網絡傳輸。網絡傳輸的開銷,不容樂觀啊!!!網絡傳輸,也許就會消耗掉你的spark作業運行的總時間的一小部分。

map副本,傳輸到了各個task上之後,是要占用內存的。1個map的確不大,1M;1000個map分布在你的集群中,一下子就耗費掉1G的內存。對性能會有什麽影響呢?

不必要的內存的消耗和占用,就導致了,你在進行RDD持久化到內存,也許就沒法完全在內存中放下;就只能寫入磁盤,最後導致後續的操作在磁盤IO上消耗性能;
你的task在創建對象的時候,也許會發現堆內存放不下所有對象,也許就會導致頻繁的垃圾回收器的回收,GC。GC的時候,一定是會導致工作線程停止,也就是導致Spark暫停工作那麽一點時間。頻繁GC的話,對Spark作業的運行的速度會有相當可觀的影響。

廣播變量的好處,不是每個task一份變量副本,而是變成每個節點的executor才一份副本。這樣的話,就可以讓變量產生的副本大大減少。
廣播變量,初始的時候,就在Drvier上有一份副本。

task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變量副本;如果本地沒有,那麽就從Driver遠程拉取變量副本,並保存在本地的BlockManager中;此後這個executor上的task,都會直接使用本地的BlockManager中的副本。

executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本,舉例越近越好。
舉例來說,(雖然是舉例,但是基本都是用我們實際在企業中用的生產環境中的配置和經驗來說明的)。50個executor,1000個task。一個map,10M。

默認情況下,1000個task,1000份副本。10G的數據,網絡傳輸,在集群中,耗費10G的內存資源。

如果使用了廣播變量。50個execurtor,50個副本。500M的數據,網絡傳輸,而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近的節點的executor的bockmanager上拉取變量副本,網絡傳輸速度大大增加;500M的內存消耗。

10000M,500M,20倍。20倍~以上的網絡傳輸性能消耗的降低;20倍的內存消耗的減少。

對性能的提升和影響,還是很客觀的。

雖然說,不一定會對性能產生決定性的作用。比如運行30分鐘的spark作業,可能做了廣播變量以後,速度快了2分鐘,或者5分鐘。但是一點一滴的調優,積少成多。最後還是會有效果的。

沒有經過任何調優手段的spark作業,16個小時;三板斧下來,就可以到5個小時;然後非常重要的一個調優,影響特別大,shuffle調優,2~3個小時;應用了10個以上的性能調優的技術點,JVM+廣播,30分鐘。16小時~30分鐘。

五 使用Kryo序列化

默認情況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機制,來進行序列化

這種默認序列化機制的好處在於,處理起來比較方便;也不需要我們手動去做什麽事情,只是,你在算子裏面使用的變量,必須是實現Serializable接口的,可序列化即可。

但是缺點在於,默認的序列化機制的效率不高,序列化的速度比較慢;序列化以後的數據,占用的內存空間相對還是比較大。

可以手動進行序列化格式的優化

Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。

所以Kryo序列化優化以後,可以讓網絡傳輸的數據變少;在集群中耗費的內存資源大大減少。
Kryo序列化機制,一旦啟用以後,會生效的幾個地方:
1、算子函數中使用到的外部變量,使用Kryo以後:優化網絡傳輸的性能,可以優化集群中內存的占用和消耗
2、持久化RDD,優化內存的占用和消耗;持久化RDD占用的內存越少,task執行的時候,創建的對象,就不至於頻繁的占滿內存,頻繁發生GC。
3、shuffle:可以優化網絡傳輸的性能

SparkConf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)

首先第一步,在SparkConf中設置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類;

Kryo之所以沒有被作為默認的序列化類庫的原因,就要出現了:主要是因為Kryo要求,如果要達到它的最佳性能的話,那麽就一定要註冊你自定義的類(比如,你的算子函數中使用到了外部自定義類型的對象變量,這時,就要求必須註冊你的類,否則Kryo達不到最佳性能)。

第二步,註冊你使用到的,需要通過Kryo序列化的,一些自定義類,SparkConf.registerKryoClasses()

項目中的使用:
.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
.registerKryoClasses(new Class[]{CategorySortKey.class})

六 使用fastutil優化數據格式

fastutil介紹:

fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;
fastutil能夠提供更小的內存占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,可以減小內存的占用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及實用的IO類,來處理二進制和文本類型的文件;
fastutil最新版本要求Java 7以及以上版本;

fastutil的每一種集合類型,都實現了對應的Java中的標準接口(比如fastutil的map,實現了Java的Map接口),因此可以直接放入已有系統的任何代碼中。
fastutil還提供了一些JDK標準類庫中沒有的額外功能(比如雙向叠代器)。

fastutil除了對象和原始類型為元素的集合,fastutil也提供引用類型的支持,但是對引用類型是使用等於號(=)進行比較的,而不是equals()方法。

fastutil盡量提供了在任何場景下都是速度最快的集合類庫。

Spark中應用fastutil的場景:

1、如果算子函數使用了外部變量;那麽第一,你可以使用Broadcast廣播變量優化;第二,可以使用Kryo序列化類庫,提升序列化性能和效率;第三,如果外部變量是某種比較大的集合,那麽可以考慮使用fastutil改寫外部變量,首先從源頭上就減少內存的占用,通過廣播變量進一步減少內存占用,再通過Kryo序列化類庫進一步減少內存占用。

2、在你的算子函數裏,也就是task要執行的計算邏輯裏面,如果有邏輯中,出現,要創建比較大的Map、List等集合,可能會占用較大的內存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作;那麽此時,可以考慮將這些集合類型使用fastutil類庫重寫,使用了fastutil集合類以後,就可以在一定程度上,減少task創建出來的集合類型的內存占用。避免executor內存頻繁占滿,頻繁喚起GC,導致性能下降。

關於fastutil調優的說明:

fastutil其實沒有你想象中的那麽強大,也不會跟官網上說的效果那麽一鳴驚人。廣播變量、Kryo序列化類庫、fastutil,都是之前所說的,對於性能來說,類似於一種調味品,烤雞,本來就很好吃了,然後加了一點特質的孜然麻辣粉調料,就更加好吃了一點。分配資源、並行度、RDD架構與持久化,這三個就是烤雞;broadcast、kryo、fastutil,類似於調料。

比如說,你的spark作業,經過之前一些調優以後,大概30分鐘運行完,現在加上broadcast、kryo、fastutil,也許就是優化到29分鐘運行完、或者更好一點,也許就是28分鐘、25分鐘。

shuffle調優,15分鐘;groupByKey用reduceByKey改寫,執行本地聚合,也許10分鐘;跟公司申請更多的資源,比如資源更大的YARN隊列,1分鐘。

fastutil的使用:

第一步:在pom.xml中引用fastutil的包

<dependency>
    <groupId>fastutil</groupId>
    <artifactId>fastutil</artifactId>
    <version>5.0.9</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

速度比較慢,可能是從國外的網去拉取jar包,可能要等待5分鐘,甚至幾十分鐘,不等

List<Integer> => IntList
  • 1

基本都是類似於IntList的格式,前綴就是集合的元素類型;特殊的就是Map,Int2IntMap,代表了key-value映射的元素類型。除此之外,剛才也看到了,還支持object、reference。

七 調節數據本地化等待時長

PROCESS_LOCAL:進程本地化,代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好
NODE_LOCAL:節點本地化,代碼和數據在同一個節點中;比如說,數據作為一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是,數據和task在一個節點上的不同executor中;數據需要在進程間進行傳輸
NO_PREF:對於task來說,數據從哪裏獲取都一樣,沒有好壞之分
RACK_LOCAL:機架本地化,數據和task在一個機架的兩個節點上;數據需要通過網絡在節點之間進行傳輸
ANY:數據和task可能在集群中的任何地方,而且不在一個機架中,性能最差
spark.locality.wait,默認是3s

Spark在Driver上,對Application的每一個stage的task,進行分配之前,都會計算出每個task要計算的是哪個分片數據,RDD的某個partition;Spark的task分配算法,優先,會希望每個task正好分配到它要計算的數據所在的節點,這樣的話,就不用在網絡間傳輸數據;

但是呢,通常來說,有時,事與願違,可能task沒有機會分配到它的數據所在的節點,為什麽呢,可能那個節點的計算資源和計算能力都滿了;所以呢,這種時候,通常來說,Spark會等待一段時間,默認情況下是3s鐘(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,比如說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,然後進行計算。

但是對於第二種情況,通常來說,肯定是要發生數據傳輸,task會通過其所在節點的BlockManager來獲取數據,BlockManager發現自己本地沒有數據,會通過一個getRemote()方法,通過TransferService(網絡數據傳輸組件)從數據所在節點的BlockManager中,獲取數據,通過網絡傳輸回task所在節點。

對於我們來說,當然不希望是類似於第二種情況的了。最好的,當然是task和數據在一個節點上,直接從本地executor的BlockManager中獲取數據,純內存,或者帶一點磁盤IO;如果要通過網絡傳輸數據的話,那麽實在是,性能肯定會下降的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。

我們什麽時候要調節這個參數?

觀察日誌,spark作業的運行日誌,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日誌。
日誌裏面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL
觀察大部分task的數據本地化級別

如果大多都是PROCESS_LOCAL,那就不用調節了
如果是發現,好多的級別都是NODE_LOCAL、ANY,那麽最好就去調節一下數據本地化的等待時長
調節完,應該是要反復調節,每次調節完以後,再來運行,觀察日誌
看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的運行時間有沒有縮短

你別本末倒置,本地化級別倒是提升了,但是因為大量的等待時長,spark作業的運行時間反而增加了,那就還是不要調節了

怎麽調節?

spark.locality.wait,默認是3s;6s,10s

默認情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

new SparkConf().set("spark.locality.wait", "10")

Spark日誌分析項目Demo(9)--常規性能調優