1. 程式人生 > >Spark記錄-Spark性能優化解決方案

Spark記錄-Spark性能優化解決方案

let .text 並行 alloc lte 知識 enabled ida 並發執行

Spark性能優化的10大問題及其解決方案

問題1:reduce task數目不合適
解決方式:
需根據實際情況調節默認配置,調整方式是修改參數spark.default.parallelism。通常,reduce數目設置為core數目的2到3倍。數量太大,造成很多小任務,增加啟動任務的開銷;數目太少,任務運行緩慢。
問題2:shuffle磁盤IO時間長
解決方式:
設置spark.local.dir為多個磁盤,並設置磁盤為IO速度快的磁盤,通過增加IO來優化shuffle性能;
問題3:map|reduce數量大,造成shuffle小文件數目多
解決方式:
默認情況下shuffle文件數目為map tasks * reduce tasks
通過設置spark.shuffle.consolidateFiles為true,來合並shuffle中間文件,此時文件數為reduce tasks數目;
問題4:序列化時間長、結果大
解決方式:
Spark默認使.用JDK.自帶的ObjectOutputStream,這種方式產生的結果大、CPU處理時間長,可以通過設置spark.serializer為org.apache.spark.serializer.KryoSerializer。
另外如果結果已經很大,可以使用廣播變量;
問題5:單條記錄消耗大
解決方式:
使用mapPartition替換map,mapPartition是對每個Partition進行計算,而map是對partition中的每條記錄進行計算;
問題6 : collect輸出大量結果時速度慢
解決方式:
collect源碼中是把所有的結果以一個Array的方式放在內存中,可以直接輸出到分布式?文件系統,然後查看文件系統中的內容;
問題7: 任務執行速度傾斜
解決方案:
如果數據傾斜,一般是partition key取得不好,可以考慮其他的並行處理方式,並在中間加上aggregation操作;如果是Worker傾斜,例如在某些Worker上的executor執行緩慢,可以通過設置spark.speculation=true 把那些持續慢的節點去掉;
使用場景:
一個stage有10個task:task0~task9,分別分配到了worker0~worker9上去執行計算,其中task0~task8都只用了5s就運行成功返回了,而由於worker9本身可能由於CPU資源長期被別的線程占用、磁盤IO緩慢等緣故,造成了task9執行緩慢,遲遲不返回,於是這個stage只能慢慢等待task9的返回。也就是整個stage的運行時間被這個task9給拖後腿了。

而如果調度端如果引入了speculatable策略,那麽上述事件的實際情況被改善為:
step1:TaskSetManager在task0~task8成功返回後,過了一段時間檢測到task9遲遲沒有返回,於是認定task9:你他媽的是個speculatableTask;
step2:TaskSetManager此時沒有task需要調度,而且此時有speculatableTask,所以調度器決定再次調度一下task9,利用和普通task一樣的調度策略將task9分發到某臺機器上,不過這次不會讓task9在worker9上調度了。假設新的task9調度到了worker0。
step3:這時,計算集群上就有了兩個同時運行的task9。在worker0上的task9運行了5s成功返回了,這時候TaskSetManager接收到task9的成功狀態,由於10個task都運行完了taskSetManager自己標識為運行完成。
PS:而那個在worker9上依然慢慢運行的task9就沒什麽用了,worker上的Executor會用Failed的形式。
問題8: 通過多步驟的RDD操作後有很多空任務或者小任務產生
解決方案:
使用coalesce或者repartition去減少RDD中partition數量;
問題9: 通過多步驟的RDD操作後有很多空任務或者小任務產生
解決方式:
使用coalesce或repartition去減少RDD中partition數量;
問題10:Spark Streaming吞吐量不高
解決方式:
可以設置spark.streaming.concurrentJobs
近期優化了一個spark流量統計的程序,此程序跑5分鐘小數據量日誌不到5分鐘,但相同的程序跑一天大數據量日誌各種失敗。經優化,使用160 vcores + 480G memory,一天的日誌可在2.5小時內跑完,下面對一些優化的思路方法進行梳理。

優化的目標

  1. 保證大數據量下任務運行成功
  2. 降低資源消耗
  3. 提高計算性能

三個目標優先級依次遞減,首要解決的是程序能夠跑通大數據量,資源性能盡量進行優化。

基礎優化

這部分主要對程序進行優化,主要考慮stage、cache、partition等方面。

Stage

在進行shuffle操作時,如reduceByKey、groupByKey,會劃分新的stage。同一個stage內部使用pipe line進行執行,效率較高;stage之間進行shuffle,效率較低。故大數據量下,應進行代碼結構優化,盡量減少shuffle操作。

Cache

本例中,首先計算出一個baseRDD,然後對其進行cache,後續啟動三個子任務基於cache進行後續計算。

對於5分鐘小數據量,采用StorageLevel.MEMORY_ONLY,而對於大數據下我們直接采用了StorageLevel.DISK_ONLY。DISK_ONLY_2相較DISK_ONLY具有2備份,cache的穩定性更高,但同時開銷更大,cache除了在executor本地進行存儲外,還需走網絡傳輸至其他節點。後續我們的優化,會保證executor的穩定性,故沒有必要采用DISK_ONLY_2。實時上,如果優化的不好,我們發現executor也會大面積掛掉,這時候即便DISK_ONLY_2,也是然並卵,所以保證executor的穩定性才是保證cache穩定性的關鍵。

cache是lazy執行的,這點很容易犯錯,例如:

val raw = sc.textFile(file)
val baseRDD = raw.map(...).filter(...)
baseRDD.cache()
val threadList = new Array(
  new Thread(new SubTaskThead1(baseRDD)),
  new Thread(new SubTaskThead2(baseRDD)),
  new Thread(new SubTaskThead3(baseRDD))
)
threadList.map(_.start())
threadList.map(_.join())

這個例子在三個子線程開始並行執行的時候,baseRDD由於lazy執行,還沒被cache,這時候三個線程會同時進行baseRDD的計算,cache的功能形同虛設。可以在baseRDD.cache()後增加baseRDD.count(),顯式的觸發cache,當然count()是一個action,本身會觸發一個job。

再舉一個錯誤的例子:

val raw = sc.textFile(file)
val pvLog = raw.filter(isPV(_))
val clLog = raw.filter(isCL(_))
val baseRDD = pvLog.union(clLog)
val baseRDD.count()

由於textFile()也是lazy執行的,故本例會進行兩次相同的hdfs文件的讀取,效率較差。解決辦法,是對pvLog和clLog共同的父RDD進行cache。

Partition

一個stage由若幹partition並行執行,partition數是一個很重要的優化點。

本例中,一天的日誌由6000個小文件組成,加上後續復雜的統計操作,某個stage的parition數達到了100w。parition過多會有很多問題,比如所有task返回給driver的MapStatus都已經很大了,超過spark.driver.maxResultSize(默認1G),導致driver掛掉。雖然spark啟動task的速度很快,但是每個task執行的計算量太少,有一半多的時間都在進行task序列化,造成了浪費,另外shuffle過程的網絡消耗也會增加。

對於reduceByKey(),如果不加參數,生成的rdd與父rdd的parition數相同,否則與參數相同。還可以使用coalesce()和repartition()降低parition數。例如,本例中由於有6000個小文件,導致baseRDD有6000個parition,可以使用coalesce()降低parition數,這樣parition數會減少,每個task會讀取多個小文件。

val raw = sc.textFile(file).coalesce(300)
val baseRDD = raw.map(...).filter(...)
baseRDD.cache()

那麽對於每個stage設置多大的partition數合適那?當然不同的程度的復雜度不同,這個數值需要不斷進行調試,本例中經測試保證每個parition的輸入數據量在1G以內即可,如果parition數過少,每個parition讀入的數據量變大,會增加內存的壓力。例如,我們的某一個stage的ShuffleRead達到了3T,我設置parition數為6000,平均每個parition讀取500M數據。

val bigRDD = ...
bigRDD.coalesce(6000).reduceBy(...)

最後,一般我們的原始日誌很大,但是計算結果很小,在saveAsTextFile前,可以減少結果rdd的parition數目,這樣會計算hdfs上的結果文件數,降低小文件數會降低hdfs namenode的壓力,也會減少最後我們收集結果文件的時間。

val resultRDD = ...
resultRDD.repartition(1).saveAsTextFile(output)

這裏使用repartition()不使用coalesce(),是為了不降低resultRDD計算的並發量,通過再做一次shuffle將結果進行匯總。

資源優化

在搜狗我們的spark程序跑在yarn集群上,我們應保證我們的程序有一個穩定高效的集群環境。

設置合適的資源參數

一些常用的參數設置如下:

--queue:集群隊列
--num-executors:executor數量,默認2
--executor-memory:executor內存,默認512M
--executor-cores:每個executor的並發數,默認1

executor的數量可以根據任務的並發量進行估算,例如我有1000個任務,每個任務耗時1分鐘,若10個並發則耗時100分鐘,100個並發耗時10分鐘,根據自己對並發需求進行調整即可。默認每個executor內有一個並發執行任務,一般夠用,也可適當增加,當然內存的使用也會有所增加。

對於yarn-client模式,整個application所申請的資源為:

total vores = executor-cores * num-executors + spark.yarn.am.cores
total memory= (executor-memory + spark.yarn.executor.memoryOverhead) * num-executors + (spark.yarn.am.memory + spark.yarn.am.memoryOverhead)

當申請的資源超出所指定的隊列的min cores和min memory時,executor就有被yarn kill掉的風險。而spark的每個stage是有狀態的,如果被kill掉,對性能影響比較大。例如,本例中的baseRDD被cache,如果某個executor被kill掉,會導致其上的cache的parition失效,需要重新計算,對性能影響極大。

這裏還有一點需要註意,executor-memory設置的是executor jvm啟動的最大堆內存,java內存除了堆內存外,還有棧內存、堆外內存等,所以spark使用spark.yarn.executor.memoryOverhead對非堆內存進行限制,也就是說executor-memory + spark.yarn.executor.memoryOverhead是所能使用的內存的上線,如果超過此上線,就會被yarn kill掉。本次優化,堆外內存的優化起到了至關重要的作用,我們後續會看到。

spark.yarn.executor.memoryOverhead默認是executor-memory * 0.1,最小是384M。比如,我們的executor-memory設置為1G,spark.yarn.executor.memoryOverhead是默認的384M,則我們向yarn申請使用的最大內存為1408M,但由於yarn的限制為倍數(不知道是不是只是我們的集群是這樣),實際上yarn運行我們運行的最大內存為2G。這樣感覺浪費申請的內存,申請的堆內存為1G,實際上卻給我們分配了2G,如果對spark.yarn.executor.memoryOverhead要求不高的話,可以對executor-memory再精細化,比如申請executor-memory為640M,加上最小384M的spark.yarn.executor.memoryOverhead,正好一共是1G。

除了啟動executor外,spark還會啟動一個am,可以使用spark.yarn.am.memory設置am的內存大小,默認是512M,spark.yarn.am.memoryOverhead默認也是最小384M。有時am會出現OOM的情況,可以適當調大spark.yarn.am.memory。

executor默認的永久代內存是64K,可以看到永久代使用率長時間為99%,通過設置spark.executor.extraJavaOptions適當增大永久代內存,例如:–conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=64m”

driver端在yarn-client模式下運行在本地,也可以對相關參數進行配置,如–driver-memory等。

查看日誌

executor的stdout、stderr日誌在集群本地,當出問題時,可以到相應的節點查詢,當然從web ui上也可以直接看到。

executor除了stdout、stderr日誌,我們可以把gc日誌打印出來,便於我們對jvm的內存和gc進行調試。

--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"

除了executor的日誌,nodemanager的日誌也會給我們一些幫助,比如因為超出內存上限被kill、資源搶占被kill等原因都能看到。

除此之外,spark am的日誌也會給我們一些幫助,從yarn的application頁面可以直接看到am所在節點和log鏈接。

復雜的集群環境

我們的yarn集群節點上上跑著mapreduce、hive、pig、tez、spark等各類任務,除了內存有所限制外,CPU、帶寬、磁盤IO等都沒有限制(當然,這麽做也是為了提高集群的硬件利用率),加上集群整體業務較多負載較高,使得spark的執行環境十分惡劣。常見的一些由於集群環境,導致spark程序失敗或者性能下降的情況有:

  • 節點掛掉,導致此節點上的spark executor掛掉
  • 節點OOM,把節點上的spark executor kill掉
  • CPU使用過高,導致spark程序執行過慢
  • 磁盤目錄滿,導致spark寫本地磁盤失敗
  • 磁盤IO過高,導致spark寫本地磁盤失敗
  • HDFS掛掉,hdfs相關操作失敗

內存/GC優化

經過上述優化,我們的程序的穩定性有所提升,但是讓我們完全跑通的最後一根救命稻草是內存、GC相關的優化。

Direct Memory

我們使用的spark版本是1.5.2(更準確的說是1.5.3-shapshot),shuffle過程中block的傳輸使用netty(spark.shuffle.blockTransferService)。基於netty的shuffle,使用direct memory存進行buffer(spark.shuffle.io.preferDirectBufs),所以在大數據量shuffle時,堆外內存使用較多。當然,也可以使用傳統的nio方式處理shuffle,但是此方式在spark 1.5版本設置為deprecated,並將會在1.6版本徹底移除,所以我最終還是采用了netty的shuffle。

jvm關於堆外內存的配置相對較少,通過-XX:MaxDirectMemorySize可以指定最大的direct memory。默認如果不設置,則與最大堆內存相同。

Direct Memory是受GC控制的,例如ByteBuffer bb = ByteBuffer.allocateDirect(1024),這段代碼的執行會在堆外占用1k的內存,Java堆內只會占用一個對象的指針引用的大小,堆外的這1k的空間只有當bb對象被回收時,才會被回收,這裏會發現一個明顯的不對稱現象,就是堆外可能占用了很多,而堆內沒占用多少,導致還沒觸發GC。加上-XX:MaxDirectMemorySize這個大小限制後,那麽只要Direct Memory使用到達了這個大小,就會強制觸發GC,這個大小如果設置的不夠用,那麽在日誌中會看到java.lang.OutOfMemoryError: Direct buffer memory。

例如,在我們的例子中,發現堆外內存飆升的比較快,很容易被yarn kill掉,所以應適當調小-XX:MaxDirectMemorySize(也不能過小,否則會報Direct buffer memory異常)。當然你也可以調大spark.yarn.executor.memoryOverhead,加大yarn對我們使用內存的寬容度,但是這樣比較浪費資源了。

GC優化

GC優化前,最好是把gc日誌打出來,便於我們進行調試。

--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"

通過看gc日誌,我們發現一個case,特定時間段內,堆內存其實很閑,堆內存使用率也就5%左右,長時間不進行父gc,導致Direct Memory一直不進行回收,一直在飆升。所以,我們的目標是讓父gc更頻繁些,多觸發一些Direct Memory回收。

第一,可以減少整個堆內存的大小,當然也不能太小,否則堆內存也會報OOM。這裏,我配置了1G的最大堆內存。

第二,可以讓年輕代的對象盡快進入年老代,增加年老代的內存。這裏我使用了-Xmn100m,將年輕代大小設置為100M。另外,年輕代的對象默認會在young gc 15次後進入年老代,這會造成年輕代使用率比較大,young gc比較多,但是年老代使用率低,父gc比較少,通過配置-XX:MaxTenuringThreshold=1,年輕代的對象經過一次young gc後就進入年老代,加快年老代父gc的頻率。

第三,可以讓年老代更頻繁的進行父gc。一般年老代gc策略我們主要有-XX:+UseParallelOldGC和-XX:+UseConcMarkSweepGC這兩種,ParallelOldGC吞吐率較大,ConcMarkSweepGC延遲較低。我們希望父gc頻繁些,對吞吐率要求較低,而且ConcMarkSweepGC可以設置-XX:CMSInitiatingOccupancyFraction,即年老代內存使用率達到什麽比例時觸發CMS。我們決定使用CMS,並設置-XX:CMSInitiatingOccupancyFraction=10,即年老代使用率10%時觸發父gc。

通過對GC策略的配置,我們發現父gc進行的頻率加快了,帶來好處就是Direct Memory能夠盡快進行回收,當然也有壞處,就是gc時間增加了,cpu使用率也有所增加。

最終我們對executor的配置如下:

--executor-memory 1G --num-executors 160 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.executor.extraJavaOptions="-XX:MaxPermSize=64m -XX:+CMSClassUnloadingEnabled -XX:MaxDirectMemorySize=1536m -Xmn100m -XX:MaxTenuringThreshold=1 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=10 -XX:+UseCompressedOops -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError"

總結

通過對Stage/Cache/Partition、資源、內存/GC的優化,我們的spark程序最終能夠在160 vcores + 480G memory資源下,使用2.5小時跑通一天的日誌。

對於程序優化,我認為應本著如下幾點進行:

  1. 通過監控CPU、內存、網絡、IO、GC、應用指標等數據,切實找到系統的瓶頸點。
  2. 統籌全局,制定相應的解決方案,解決問題的思路是否清晰準確很重要,另外切勿『頭疼醫頭,腳疼醫腳』,應總體考慮把握。
  3. 了解一些技術的背景知識,對於每次優化盡量做得徹底些,多進行總結。

Spark記錄-Spark性能優化解決方案