1. 程式人生 > >Spark Streaming 流計算優化記錄(5)-分割槽與記憶體的優化

Spark Streaming 流計算優化記錄(5)-分割槽與記憶體的優化

8. 不一定非得每秒處理一次
由於Spark Streaming的原理是micro batch, 因此當batch積累到一定數量時再發放到叢集中計算, 這樣的資料吞吐量會更大些. 這需要在StreamingContext中設定Duration引數. 我們試著把Duration調成兩秒, 這樣Spark就會在接收Kafka的模組中積累了2秒的資料後, 在排程作業到叢集中計算.
結合上述做過的優化, 跑了一下, 結果如下:
 
從統計看到, 在Kafka每2秒傳送90000條記錄與HDFS上的700萬條進行處理並inner join的計算耗時一般能平穩在2秒, 偶爾會有3秒, 那其實是發生了跨節點跨機房的資料傳輸所造成的.



9. 使用RangePartition或RangeOnHDFSSizePartitioner

但在跑上一輪壓力測試時, 發現瞭如下現象:
 
一些節點(executor)上的task set的處理時間明顯比其它節點的處理時間要長, 導致其它節點的executor在空等, 也就是所並沒有完全利用所有資源, 沒有發揮叢集所應有的效能.
造成這種情況的原因一般是資料傾斜, 大量的包含了常用key的資料分佈在了少數節點上.
這是一個優化點, 而優化目標就是儘量讓每個task的處理時間差不多, 把task減小, 讓task能均勻分佈在各個executor, 並且讓task們充滿整個executor的生命週期.

一般會用RangePartition或新引入的RangeOnHDFSSizePartitioner來根據資料的key出現的密度對資料進行重新分佈. 前者只是普通的按key密度劃分, 但可以輸入分割槽數目作為引數; 後者可以根據檔案的大小以及key的密度進行劃分, 並接收一個分割槽因子作為引數, “實際分割槽數 = 根據檔案大小計算出來的分割槽數 * 分割槽因子 ”. 我這裡用的是RangePartition.
在談一下分割槽數, 一般來說分割槽數越多, task會越小, 就越能填滿整個executor生命週期, 但task太小太多也會在排程和序列化上耗費大量時間, 輸入RangePartition的分割槽數要適當, 不能太小也不能太大. 這裡取的是99.

優化後task的耗時圖如下:
 
可看到各個task是相對均勻地塞滿了executor的生命週期. 
該優化後, 我們在Kafka每2秒傳送90000條記錄的基礎上增大從HDFS載入的記錄數, 增加到2300萬條, 以進行處理並inner join, 此時, 也就是90000 * 2300萬條資料, 計算耗時依然能平穩在2-3秒. 

10. Spark記憶體分配的優化
在壓力測試的過程中, 我們是把載入的HDFS資料快取到了記憶體中, 以加快處理速度的, 雖然資料已經解決3G, 但我們看了一下RDD快取區的記憶體使用量, 其實還有大量區域沒被使用.
 
因此我們可以對Spark內部的記憶體分配進行一下調整, 調高用於shuffle的記憶體, 調低用於快取RDD的記憶體.
大家知道, Spark應用在container上的記憶體是這樣分佈的:
 

我們可以減少配置項” spark.storage.memoryFraction”的比例數,增大配置項” spark.shuffle.memoryFraction”的比例數, 從RDD快取區中拿一部分記憶體出來用於shuffle的計算.

本例中的配置是” spark.storage.memoryFraction=0.45”以及” spark.shuffle.memoryFraction=0.4”. 但調整後GC明顯增加, 一些executor甚至出現了使用記憶體過多而被YARN的NodeManager給kill掉的情況, 在一再確保” spark.storage.memoryFraction=0.45”是足夠給RDD快取使用並還可以保留0.5以上的空餘的情況下, 看來下一步就不得不調優Spark應用的GC了.