1. 程式人生 > >Spark 性能調優零散知識

Spark 性能調優零散知識

ges ermaster 傾斜 entry 鏈接方式 nbsp spec manage 基礎

1. 如果 Spark 中 CPU 的使用率不夠高,可以考慮為當前的程序分配更多的 Executor, 或者增加更多的 Worker 實例來充分的使用多核的潛能

2. 適當設置 Partition 分片數是非常重要的,過少的 Partition 分片數可能會因為每個 Partition 數據量太大而導致 OOM 以及頻繁的 GC,而過多的 Parition 分片數據可能會因為每個 Partition 數據量太小而導致執行效率低下。

3. 提升 Spark 硬件尤其是 CPU 使用率的一個方式 就是增加 Executor 的並行度,但是如果 Executor 過多的話,直接分配 在每個 Executor的內存就大大減少,在內存的操作就減少,基於磁盤的操作就越來越多,導致性能越來越差。

4. 處理 Spark Job 的時候如果發現比較容易內存溢出,一個比較有效的辦法是減少並行的 Executor 的數量,這樣每個 Executor 就可以分配到更多的內存,進而增加每個 Task 使用的內存數量,降低 OOM 的風險。

5. 處理Spark Job 的時候如果發現比較容易內存溢出,一個比較有效的辦法就是增加 Task 的並行度,這樣每個 Task 處理的 Partition 的數量就變少了,減少了 OOM的可能性。

6. 處理Spark Job 的時候如果發現某些 Task 運行得特別慢,一個處理辦法是增加並行的 Executor 的個數,這樣每個 Executor 分配 的計算資源就變少了,可以提升硬件的整體使用效率。另一個辦法是增加 Task 的並行度,減少每個 Partition 的數據量來提高執行效率。

7. 處理Spark Job 的時候如果出現特別多的小文件,這時候就可以通過 coalesce 來減少 Partition 的數量,進而減少並行運算的 Task 的數量來減少過多任務的開辟,從而提升硬件的使用效率。

8. 默認情況下 Spark 的 Executor 會盡可能占用當前機器上盡量多的 Core,這樣帶來一個好處就是可以最大化的提高計算的並行度,減少一個 Job 中任務 運行的批次,但帶來一個風險就是如果每個 Task 占用內存比較大,就需要頻繁的 spill over 或者有更多的 OOM 的風險。

9. Spark 集群在默認情況每臺 host 上只有一個 Worker, 而每個 Worker 默認只會為當前應用程序分配一個 Executor來執行 Task,但實際上通過配置 Spark-env.sh 可以讓每臺 host 上有若幹的 Worker, 而每個 Worker 下面又可以有若幹個 Executor。

10。 Spark Stage 內部是一組計算邏輯完全相同但處理數據不同的分布式並行運行的 Task 構成, Stage 內部的計算都以 Pipeline 的方式進行,不同的 Stage之間是產生 Shuffler 的唯一方式。

11. 在Spark 中可以考慮在 Worker 節點上使用固態硬盤以及把 Worker 的 Shuffle 結構保存到 RAMDisk 的方式來極大的提高性能。

12. 當經常發現機器頻繁的 OOM 的時候,可以考慮的一種方式就是減少並行度,這樣同樣的內存空間並行運算的任務 少了,那麽對內存的占用就更少了,也就減少了 OOM 的可能性。

Spark 性能優化核心基石:

1. Spark 采用的是 Master-Slaves 的模式進行資源管理和任務 執行的管理:

a) 資源管理: Master-Workers, 在一臺機器上可以有多個 Workers

b) 任務執行: Driver-Executors,當在一臺機器上分配多個 Workers 的時候那麽默認情況下每個 Worker 都會為當前運行的應用程序分配一個 Executor,但是我們可以修改配置來讓每個 Worker 為我們當前的應用 程序分配若幹個 Executors; 程序運行的時候會被劃分成為若幹個 Stages(Stages內部沒有 Shuffle,遇到 Shuffle 的時候會劃分 Stage),每個 Stage裏面包含若幹個處理邏輯完全一樣只是處理數據不一樣的 Task, 這些 Task 會被分配到 Executor 上去並行執行。

Spark 性能優化招式:

1. Broadcast。如果 Task 在運行的過程中使用超過 20KB 大小的靜態大對象,這個時候一般都要考慮使用 Broadcast。例如一個大表 Join 一個小表,此時如果使用 Broadcast 把小表廣播出去,這時候大表就只需在自己的節點等待小表數據的到來。

Task 性能優化:

1. 慢任務的性能優化:可以考慮減少每個 Partition 處理的數據量。同時建議開啟 spark.speculation。

2. 盡量減少 Shuffle, 例如我們要盡量減少 groupByKey 操作,因為 groupByKey 會要求通過網絡拷貝(shuffle) 所有的數據,優先考慮使用 reduceByKey。因為 reduceByKey 會首先 reduce locally,然後再拷貝。

數據傾斜

1. 定義更加合理的 Key(或者說自定義 Partitioner)

2. 可以考慮使用 ByteBuffer 來存儲 Block

網絡

1. 可以考慮 Shuffle 的數據放在 alluxio (前身 Tackyon) 中帶來更好的數據本地性,減少網絡的 Shuffler

2. 優先采用 Netty (Spark 2.X 的默認方式)的方式進行網絡通信

3. mapPartitions 中的函數在一個 Partition 裏作用一次

數據結構:

1. Java的對象。對象頭是16個字節(例如指向對象的指針等元數據),如果對象中只有一個 int 的 property,則此時會占據 20 個字節,也就是說對象的元數據占用了大部分的空間,所有在封裝數據的時候盡量不要使用對象。例如說使用 Json 格式來狀封裝數據

2. Java 的 String 在實際占用內存方面要額外使用 40 個字節(內部使用 char 數組來保存字符),另外需要註意的是 String 中每個字符是2個字節(UTF-16),如果內部有5個字符的話,實際上會占用50個字節。

3. Java中的集合List、Map 等等,其內部一般使用鏈表來實現。具體的每個數據使用 Entry 等,這些也非常消耗內存

4. Java 中的基本數據類型會自動封箱操作,這會額外增加對象頭的空間占用。

5. 優先使用原生數據,盡可能不要直接使用 ArrayList、HashMap、LinkedList 等數據結構

6. 優先使用 String (推薦使用 JSON),而不是采用 HashMap、List 等來封裝數據

內存消耗診斷

1. JVM 自帶的診斷工具。例如: JMap、JConsole等

2. 在開發、測試、生產環境下用的最多的是日誌。 Driver 端產生的日誌,最簡單也是最有效的方式就是調用 RDD.cache,當進行 cache 操作的時候, Driver 上的 BlockManangerMaster 會記錄該信息並寫進日誌中。

persist 和 checkpoint

1. 當反復使用某個(些)RDD的時候,建議使用 persist 來對數據進行緩存

2. 如果某個步驟的 RDD 計算特別耗時或者經歷了很多步的計算,數據丟失的話則重新計算的代價比較大,此時考慮使用 checkpoint,因為 checkpoint 是把數據寫入 HDFS 的,天然具有高可靠性

序列化和反序列化

發送磁盤IO 和網絡通信的時候會序列化和反序列化,更為重要的考慮序列化和反序列化的時候有另外兩種情況:a) Persist 的時候 b)編程的時候。使用算子的函數操作如果傳入了外部數據就必須序列化和反序列化

1. Spark 的序列化機制默認使用 Java 自帶的序列化機制(其實現類是 ObjectInputStream 和 ObjectOutputStream)。效率較低,強烈建議使用 Kryo 序列化機制 ,它比 Java 的序列化節省近 10 倍的空間。

2. Spark 中如果我們自定義了 RDD 中的數據元素的類型,則必須實現 Serializable 接口,也可以實現自己的序列化接口(Externalizable)來實現更高效的 Java序列化算法。如果使用 Kryo,則需要把自定義的類註冊給 Kryo。

3. Spark 中 Scala 常用的類型自動的能過 AllScalaRegistry 註冊給了 Kryo 進行序列化管理。

4. Kryo 在序列化的時候緩存空間默認大小是 2MB,可以根據具體的業務模型調整該大小,通過設置 spark.kryoserializer.buffer

5. 在使用 Kryo 的時候,強烈建議註冊時寫完整的包名和類名。

數據本地性

1. 如果數據是 PROCESS_LOCAL, 但是此時並沒有空閑的 Core 來運行我們的 Task,此時 Task 就要等待。例如等待3000ms, 3000ms 內如果 Task 不能運行,則退而求其次采用 NODE_LOCAL。同樣的道理 NODE_LOCAL也會有等待時間。

2. 如何配置 Locality呢? 可以統一采用 spark.locality.wait 來設置。也可以分別設置如: spark.locality.wait.node、spark.locality.wait.process 。

RDD 的自定義(以 Spark on HBase 為例)

1. 第一步是定義 RDD.getParitions 的實現

a) createRelation 具體確定 HBase 的鏈接方式和具體訪問的表

b) 然後通過 HBase 的API 來獲取 Region 的 List

c) 可以過濾出有效的數據

d) 最後返回 Region 的 Array[Partition],也就是說一個 Partition處理一個 Region 的數據,為更佳的數據本地性打下基礎

2. 第二步是 RDD.getPreferredLocations

根據 Split 包含的 Region 信息來確定 Region 具體在什麽節點上。這樣 Task 在調度的時候就可以優先被分配到 Region 所在的機器上,最大化地提高數據本地性

3. 第三步是 RDD.compute

根據 Split 中的 Region 等信息調用 HBase 的 API 來進行操作(主要是查詢)

Shuffle 性能調優

1. 問題: Shuffle output file lost? 真正的原因一般由 GC 導致的。GC 尤其是Full GC 時通常會導致線程停止工作,這個時候下一個 Stage 的 Task 在默認情況下就會嘗試重試來獲取數據,一般重試3 次,每次重試時間間隔為5S,也就是說默認情況下 15S 內如果還是無法抓到數據的話,就會出現 Shuffle output file lost 等 情況 ,進而導致 Task重試,甚至會導致 Stage 重試,最嚴重的是會導致 App 失敗。在這個時候首先就要采用高效的內存數據結構和序列化機制,JVM 的調優來減少 Full GC 的產生。

2. 在 Shuffle 的時候, Reducer 端獲取數據會有一個指定大小的緩存空間,如果內存不夠,可以適當的增大該緩存空間(通過調整 spark.reducer.maxSizeInFlight),否則會 Spill 到磁盤上,影響效率

3. 在 Shuffle MapTask 端通常也會增大Map 任務的寫磁盤的緩存。默認值是32K.

4. 調整獲取 Shuffle 數據的重試次數,默認是3次,通常建議增大重試次數

5. 調整獲取 Shuffle 數據的重試時間間隔,默認是5秒。強烈建議提高該時間。

(個人覺得以上兩點可以看出,默認情況下會有 15 秒的時間,如果GC需要這麽長的時間的話,應該是GC的問題,首先應該是優化GC)

6. 在 Reducer 端做 Aggregation 的時候,默認是 20% 的內存用來做 Aggegation。如果超出了這個大小就會溢出到磁盤上,建議調在百分比來提高性能。

鎢絲計劃

1. Tungsten 的內存管理 機制獨立於 JVM, 所以Spark 操作數據的進候具體操作的是 Binary Data,而不是 JVM Object。而且還免去了序列化和反序列化的過程。

2. 內存管理方面: Spark 使用了 sum.misc.Unsafe 來進行 Off-heap 級別的內存分配、指針使用及內存釋放。Spark 為了統一管理 Off-heap 和 On-heap 而提出了 Page

3. 如果想讓程序使用 Tungsten 功能,可以配置 spark.shuffle.manager=tungsten-sort

4. DataFrame 中自動開啟了 Tungsen 功能

5. 寫數據在內存足夠大的情況下是寫到 Page 裏面,在 Page 中有一條條的 Record,如果內存不夠的話會 Spill 到磁盤上。

如何看內存是否足夠?兩方面:

a) 系統默認情況下給 ShuffleMapTask 最大準備了多少內存空間。默認情況下是 ExecutorHeapMemory * 0.8 * 0.2

(spark.shuffle.memoryFraction=0.2, spark.shuffle.safetyFraction=0.8)

b) 和 Task 處理的 Partition 大小緊密相關

技術分享圖片

Spark 性能調優零散知識