1. 程式人生 > >Spark Streaming 流計算優化記錄(6)-GC優化與shuffle service

Spark Streaming 流計算優化記錄(6)-GC優化與shuffle service

11. Spark應用的GC調優
說到GC, 可能很多人都傾向於使用新潮的G1垃圾收集器, 特別是intel的那幾個兄弟在databrick發表了篇用G1調優Spark應用的博文後, 就更多人熱衷於嘗試G1了.
但其實我們再去年就對G1和老牌的CMS+NewPar進行過對比測試, 發現G1根本沒有比CMS好, 有時候還會導致更多的FullGC, 而實際上連Oracle官方都覺得G1還沒有production ready. 當然了, 我們是在JDK7上測試的, 而intel的哥們是在那個JDK版本上測試就不得而知了, 可能JDK8對G1的改進會好些吧, 但由於我們幾乎所有應用都在用JDK7, 因此也不可能在JDK8上進行測試. 另外, intel的兄弟使用優化過的G1與沒優化的CMS進行對比測試, 結果能否採用也是需要謹慎考慮的.

我們還是比較傾向於CMS+NewPar的GC配置, 因此在LinkedIn團隊所提供的CMS GC引數的基礎上進行了一下修改: ” -server -XX:PermSize=256m -XX:MaxPermSize=256m -XX:NewSize=1536m -XX:MaxNewSize=1536m -XX:SurvivorRatio=8 -XX:+UseParNewGC -XX:MaxTenuringThreshold=31 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+CMSClassUnloadingEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSFullGCsBeforeCompaction=3 -XX:ParallelGCThreads=16 -XX:ConcGCThreads=12 -XX:+ExplicitGCInvokesConcurrent -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses -XX:+AlwaysPreTouch -XX:-OmitStackTraceInFastThrow -XX:+UseCompressedStrings -XX:+UseStringCache -XX:+OptimizeStringConcat -XX:+UseCompressedOops -XX:+CMSScavengeBeforeRemark   -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintHeapAtGC”, 嘗試一下後, 發現效果不好, 依然有一些executor容易被YARN NodeManager給kill掉.

猜測原因可能是由於我們各個節點的機子配置過於多樣化, 一些高配, 一些低配, 而且節點也會時不時增加和減少. 但是LinkedIn團隊的引數是基於他們普遍高配置的節點而調試出來的, JVM HEAP的各個區都可以人為固定, 使用多少執行緒進行GC和Remark也是可以算出來的, 而同樣的方法在我們這兒就不成立了, 這臺節點上Eden區可以用2個G, 另一臺節點可能剩餘記憶體都沒有2個G, 因此, 我們需要去掉一些硬性對JVM HEAP進行手工配置的選項, 也就是變成了這樣: "spark.executor.extraJavaOptions=  -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -XX:+HeapDumpOnOutOfMemoryError -verbose:gc ".

同時為了減少進入Old Gen的物件數, 擴大Eden的使用率, 加上了” -XX:MaxTenuringThreshold=31 -XX:SurvivorRatio=8”.
優化後除了executor被kill的情況不在發生外, 連Full GC的影子都看不著.
加大Kafka流入的資料量到每3秒162000條訊息, 也就是每秒近6萬的訊息流入, 與從HDFS載入的2300萬條資料進行inner join.
 
162000條訊息都能在規定的3秒內處理完, 也就是每秒處理近 6萬 * 2千3百萬條記錄的inner join.
在加大Kafka流入的資料量到每3秒180000條, 結果如下:
 
180000條記錄大部分能在3s內處理完, 但有一些job需要4秒. 結合上一次測試結果, 也就是該Spark Streaming應用的處理極限大概是在17萬到18萬條Kafka輸入訊息之間, 也就是解決 18萬 * 2千3百萬條記錄, 共3G+左右資料的inner join.
下面我們稍微用G1垃圾收集器進行一下對比, 引數是在intel給的G1優化引數的基礎上修改的: "spark.executor.extraJavaOptions=  -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:+AlwaysPreTouch -XX:InitiatingHeapOccupancyPercent=30 -XX:ParallelGCThreads=20 -XX:-OmitStackTraceInFastThrow -XX:+UseCompressedStrings -XX:+UseStringCache -XX:+OptimizeStringConcat -XX:+UseCompressedOops -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+HeapDumpOnOutOfMemoryError -verbose:gc ", 結果如下:
 

即使對於較少資料的162000條Kafka輸入訊息, 也出現了幾十秒的延遲, 這是無法接收的, 因此在production環境, 還是建議用CMS+NewPar的設定, 並且如果不能保證每臺節點的硬體配置一致的話, 最好就不要去人為規定JVM HEAP各個空間的大小以及GC執行緒的數量.

12.  使用shuffle service和i-CMS GC
其實大家可以看到, 上一節在每3秒處理18萬條訊息, 每秒處理6萬條訊息的場景中, 偶爾會出現處理時間超過3秒的情況, 因此我們決定進一步優化, 把shuffle service給用上 進一步加快shuffle階段的效率和穩定性; 並根據伺服器的配置差異巨大的現實情況, 使用i-CMS GC, 此GC能減少Full GC的次數, 適用於無法硬性規定JVM Heap各區大小以及GC發生閾值的情況, 當然, 它會耗費一些CPU時間.
Shuffle Service的安裝, 詳情請看逛網. 大概的說就是要把spark的shuffle service jar包copy到yarn的lib目錄, 然後在yarn的aux service配置中加入該shuffler service. 同時要在Spark應用中配置” spark.shuffle.service.enabled=true”.
而開啟i-CMS GC, 則需要在JVM配置中加入”  -XX:+CMSIncrementalMode -XX:+CMSIncrementalPacing -XX:CMSIncrementalDutyCycleMin=0 -XX:CMSIncrementalDutyCycle=10”.
本該說這次的測試用了黃金配置, 應該比上一節的測試效果要好, 但任何事情都不能想當然, 結果讓我們大跌眼睛, 這次的測試效果與上一節差不多, 但悲慘的是在十幾分鍾後出現了GC時間超5秒的情況, 發生了executor被kill事件. 通過檢視GC日誌可知, 在spark應用執行是會出現頻繁的同步mark, 這是為了減少Full GC而產生的, 但問題是GC時間卻會漸漸的加大, 估計是沒有Full GC會導致一些沒被Young GC回收的物件造成的積壓吧, 這一點上我沒有深究, 就先把i-CMS GC撤掉試試吧, 畢竟practice speaks lounder than words.
撤掉i-CMS GC後, 果然效率與穩定性大增, 對18萬Kafka訊息, 18萬 * 2千3百萬條記錄, 共3G+左右資料的inner join的場景的處理穩定在3秒的處理速度. (木有截圖,不好意思啊)