1. 程式人生 > >Spark 官網提到的幾點調優

Spark 官網提到的幾點調優

java 序列化 行處理 object 鏈式 註冊 nim mem 存儲 對象數組

1. 數據序列化

默認使用的是Java自帶的序列化機制。優點是可以處理所有實現了java.io.Serializable 的類。但是Java 序列化比較慢。

可以使用Kryo序列化機制,通常比Java 序列化機制性能高10倍。但是並不支持所有實現了java.io.Serializable 的類。使用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 開啟Kryo序列化。不使用Kryo做為默認值的原因是:需要註冊自定義的類。例如:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc 
= new SparkContext(conf)

註意:如果Object很大,需要在配置中增加 spark.kryoserializer.buffer 的值。如果沒有在Kryo中註冊自定義的類,Kryo也能正常工作,這些類會完全地保存下來(等於沒有序列化就進行傳輸或保存了),會造成資源浪費。

2. 內存調優

可以考慮3個方面:(1)對象需要的總內存 (2)指向這些對象的指針 (3)GC

通常情況下,指針占用的空間將是原始數據的2~5倍。有以下幾個原因:

(1)Java對象的“object header”(對象頭),包含了指向它的類的指針,占用16bytes。對於一些只有很少數據的object,16bytes要比對象本身占用的空間要多。

(2)Java String 中在原始String數據的基礎上有另外40bytes的開銷(String的保存形式是Char的數組,並且有length的額外數據)。因為String內部使用UTF-16編碼,每個char 占用2個byte。因此10個字符的String,將會很輕易地占用60個bytes

(3)諸如HashMap,LinkedList 的集合類,使用鏈式結構,每個entry(Map.Entry)都有一個包裝類。這些類不僅有“object header”,還有指向下一個對象的指針(通常是8個bytes)。

(4)基本類型的集合,通常會被包裝成對象類型。

3. 內存管理

Spark中的內存使用主要有兩類:執行內存和存儲內存。執行內存是指shuffles, joins, sorts and aggregations計算時用到的內存。存儲內存主要是指cache和集群間傳播的內部數據用到的內存。執行內存和存儲內存使用的是同一塊區域。當沒有計算執行時, 存儲將獲得所有這塊區域的可用內存,反之亦然。執行比存儲具有更高的獲取內存的優先級,也就是說,如果內存不夠時,存儲會釋放一部分內存給執行用,直到存儲需要的最低的閥值。

有兩個相關的配置,但是通常來說,用戶不需要改變其默認值。

(1) spark.memory.fraction 表示使用的Java 堆內存的比例。默認值0.6. 剩下的40%的內存用於:(a)存儲用戶數據、Spark內部元數據 (b)防止OOM

(2)spark.memory.storageFraction 表示上面所說的存儲內存最少占用的比例。默認值 是0.5

4. 確定內存消耗

最好的方式是生成一個RDD並cache,在web UI 中的 Storage 中查看占用了多少內存。

確定一個指定object 占用內存的大小,可以使用 SizeEstimator.estimate(obj) 方法。

5. 調整數據結構

減少內存消耗,首先應該避免使用基於指針的數據結構和包裝對象等諸如此類的Java特性。有以下幾種途徑:

(1)數據結構優先使用對象數組和基本類型,盡量不使用Java和scala裏的集合類(如:HashMap)。可以使用 fastutil (http://fastutil.di.unimi.it/) 提供的集合類和基本類型。

(2)盡量避免使用有很多小對象和指針的內嵌結構

(3)考慮使用數字ID 和枚舉類代替作為key的String

(4)如果內存小於32GB,在Spark-env.sh 裏設置 -XX:+UseCompressedOops,這樣指針使用4bytes 而不是8bytes

6. 序列化RDD 存儲

當你的 object 仍然很大,簡單的降低內存消耗的方法是使用序列化的存儲方法。強烈建議使用kyro序列化機制。

7. 垃圾回收調優

垃圾回收的時間主要是花費在尋找那些不再被引用的對象。因此它跟Java Object 的數量有關。我們應該使用具有較少object的數據結構(如:使用array代替linkedList)。一種較好的方法是用序列化的形式持久化Object,這樣每個RDD partition 只有一個字節數組。

測量GC的影響:在Java option 中加入 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 後,可在worker 的 stdout 中找到GC的日誌。

(1) 在任務完成之前,如果有多次full GC,說明執行任務的內存不夠

(2) 如果有多次minor GC,但是 full GC 並不多,可以增大 Eden 區的大小

(3) 在GC的日誌中,如果老年代快滿了,減少 spark.memory.fraction 以降低cache所用的內存

(4) 嘗試使用 G1 垃圾回收器(-XX:+UseG1GC)。如果堆比較大,應該增加 G1 區的大小(通過 -XX:G1HeapRegionSize 設置

(5) 如果任務是從HDFS上讀數據,HDFS 塊的大小為 128M,塊解壓後的大小一般為原始大小的2~3倍,如果要運行4個task,可以估算Eden區需要 4*3*128M。

8. 其它

(1)並行度。除非你手動每步都設置較高的並行度,否則,集群不會被最大化地利用。Spark會自動根據每個文件的大小設置相應的task數量。對於諸如groupByKey,reduceByKey 等 reduce 操作,並行度為最大的父 RDD 的 partition 的數量。可以配置 spark.default.parallelism 設置默認的並行度。一般來講,建議一個CPU 運行 2~3個task。

(2)Reduce Task 的內存使用。有時候,發生OOM並不是因為內存中放不下RDD,而是因為某個或幾個task 分配的內存不夠。例如:某個groupByKey 操作處理很大的數據集(因為數據傾斜的緣故)。 簡單的解決方法是:設置較高的並行度。

(3)廣播大的變量。 使用廣播的功能能有效地減少序列化的 task 的大小和集群加載job的花消。如果你的task中需要使用一個來自driver的大的object(如:靜態查詢表),應該把它轉化成廣播變量。 Master端會打印序列化後的 task 的大小,通常如果大於20KB 的話,就值得去優化。

(4)數據本地性。數據本地性可分為以下幾類:

(a) PROCESS_LOCAL 數據在運行代碼的JVM中。

(b) NODE_LOCAL 數據和運行的代碼在同一臺機器上。如:當前節點上正好有HDFS的數據塊。

(c) NO_PREF 數據可以較快獲取,但是不在本地

(d) RACK_LOCAL 數據在同一 機架內,需要通過network獲取

(e) Any 除上述外的數據

最好的情況就是 task 都運行在最好的數據本地性的環境,但通常不太可能。很多時候,某個executor 上的任務都完成了,而其它忙碌的機器上尚有未處理的data。Spark通常會等一段時間,以等待忙碌的機器空閑下來去處理數據(因為具有較高的本地性)。當超過這個等待時間後,空間的executor會把這些數據拉過來進行處理。每個數據本地性級別對應的等待時間可以查看配置中的 spark.locality 部分。通常默認的配置工作得蠻好的。如果你的task運行時間較長,可以增加這些值。

Spark 官網提到的幾點調優