1. 程式人生 > >一分鐘了解spark的調優

一分鐘了解spark的調優

shu 序列 而是 所有 鏈接 情況 類型 調優 註意事項

Tuning Spark

  • 數據序列化
  • 內存調優
    • 內存管理概述
    • 確定內存消耗
    • 調整數據結構
    • 序列化 RDD 存儲
    • 垃圾收集調整
  • 其他註意事項
    • 並行度水平
    • 減少任務的內存使用
    • 廣播大的變量
    • 數據本地化
  • 概要

由於大多數 Spark 計算的內存性質, Spark 程序可能由集群中的任何資源( CPU ,網絡帶寬或內存)導致瓶頸。 通常情況下,如果數據有合適的內存,瓶頸就是網絡帶寬,但有時您還需要進行一些調整,例如 以序列化形式存儲 RDD 來減少內存的使用。 本指南將涵蓋兩個主要的主題:數據序列化,這對於良好的網絡性能至關重要,並且還可以減少內存使用和內存優化。 我們選幾個較小的主題進行展開。

數據序列化

序列化在任何分布式應用程序的性能中起著重要的作用。 很慢的將對象序列化或消費大量字節的格式將會大大減慢計算速度。 通常,這可能是您優化 Spark 應用程序的第一件事。 Spark 宗旨在於方便和性能之間取得一個平衡(允許您使用操作中的任何 Java 類型)。 它提供了兩種序列化庫:

  • Java serialization: 默認情況下,使用 Java ObjectOutputStream 框架的 Spark 序列化對象,並且可以與您創建的任何實現 java.io.Serializable 的類一起使用。 您還可以通過擴展 java.io.Externalizable 來更緊密地控制序列化的性能。 Java 序列化是靈活的,但通常相當緩慢,並導致許多類的大型序列化格式。
  • Kryo serialization: Spark 也可以使用 Kryo 庫(版本2)來更快地對對象進行序列化。 Kryo 比 Java 序列化(通常高達10x)要快得多,而且更緊湊,但並不支持所有的 Serializable 類型,並且需要先註冊您將在程序中使用的類以獲得最佳性能。

您可以通過使用 SparkConf 初始化作業 並進行調用來切換到使用 Kryo conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")。此設置配置用於不僅在工作節點之間進行洗牌數據的串行器,而且還將 RDD 序列化到磁盤。 Kryo 不是默認的唯一原因是因為自定義註冊要求,但我們建議您嘗試在任何網絡密集型應用程序。自從 Spark 2.0.0 以來,我們在使用簡單類型,簡單類型的數組或字符串類型對RDD進行混洗時,內部使用 Kryo serializer 。

Spark 自動包含 Kryo 序列化器,用於 Twitter chill 中 AllScalaRegistrar 涵蓋的許多常用的核心 Scala 類。

要使用 Kryo 註冊自己的自定義類,請使用該 registerKryoClasses 方法。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

所述 Kryo 文檔 描述了更先進的註冊選項,如添加自定義序列的代碼。

如果您的對象很大,您可能還需要增加 spark.kryoserializer.buffer 配置。該值需要足夠大才能容納您將序列化的最大對象。

最後,如果您沒有註冊自定義類, Kryo 仍然可以工作,但它必須存儲每個對象的完整類名稱,這是浪費的。

內存調優

有三個方面的考慮在調整內存使用:該的存儲你的對象所使用的(你可能希望你的整個數據集,以適應在內存中),則成本訪問這些對象,並且開銷垃圾收集(如果你有高成交物品條款)。

默認情況下, Java 對象可以快速訪問,但可以輕松地消耗比其字段中的 “raw” 數據多2-5倍的空間。這是由於以下幾個原因:

  • 每個不同的 Java 對象都有一個 “object header” ,它大約是16個字節,包含一個指向它的類的指針。對於一個數據很少的對象(比如說一個Int字段),這可以比數據大。
  • Java String 在原始字符串數據上具有大約40字節的開銷(因為它們存儲在 Char 數組中並保留額外的數據,例如長度),並且由於 UTF-16 的內部使用而將每個字符存儲為兩個字節 String 編碼。因此,一個10個字符的字符串可以容易地消耗60個字節。
  • 公共收集類,例如 HashMapLinkedList ,使用鏈接的數據結構,其中每個條目(例如: Map.Entry )存在”包裝器”對象。該對象不僅具有 header ,還包括指針(通常為8個字節)到列表中的下一個對象。
  • 原始類型的集合通常將它們存儲為”盒裝”對象,例如: java.lang.Integer

本節將從 Spark 的內存管理概述開始,然後討論用戶可以采取的具體策略,以便在他/她的應用程序中更有效地使用內存。具體來說,我們將描述如何確定對象的內存使用情況,以及如何改進數據結構,或通過以串行格式存儲數據。然後我們將介紹調整 Spark 的緩存大小和 Java 垃圾回收器。

內存管理概述

Spark 中的內存使用大部分屬於兩類:執行和存儲。執行存儲器是指用於以混洗,連接,排序和聚合計算的存儲器,而存儲內存是指用於在集群中緩存和傳播內部數據的內存。在 Spark 中,執行和存儲共享一個統一的區域(M)。當沒有使用執行存儲器時,存儲器可以獲取所有可用的存儲器,反之亦然。如果需要,執行可以驅逐存儲,但只有在總存儲內存使用量低於某個閾值(R)之前。換句話說, R 描述 M 緩存塊永遠不會被驅逐的區域。由於實施的復雜性,存儲不得驅逐執行。

該設計確保了幾個理想的性能。首先,不使用緩存的應用程序可以將整個空間用於執行,從而避免不必要的磁盤泄漏。第二,使用緩存的應用程序可以保留最小的存儲空間(R),其中數據塊不受驅逐。最後,這種方法為各種工作負載提供了合理的開箱即用性能,而不需要用戶內部如何分配內存的專業知識。

雖然有兩種相關配置,但典型用戶不需要調整它們,因為默認值適用於大多數工作負載:

  • spark.memory.fraction 表示大小 M(JVM堆空間 - 300MB)(默認為0.6)的一小部分。剩余的空間(40%)保留用於用戶數據結構,Spark中的內部元數據,並且在稀疏和異常大的記錄的情況下保護OOM錯誤。
  • spark.memory.storageFraction 表示大小 RM (默認為0.5)的一小部分。 RM 緩存塊中的緩存被執行驅逐的存儲空間。

spark.memory.fraction 應該設置值,以便在 JVM 的舊版或”終身”版本中舒適地適應這一堆堆空間。有關詳細信息,請參閱下面高級 GC 調優的討論。

確定內存消耗

大小數據集所需的內存消耗量的最佳方式是創建 RDD ,將其放入緩存中,並查看 Web UI 中的“存儲”頁面。該頁面將告訴您 RDD 占用多少內存。

為了估計特定對象的內存消耗,使用 SizeEstimatorestimate 方法這是用於與不同的數據布局試驗修剪內存使用情況,以及確定的空間的廣播變量將占據每個執行器堆的量是有用的。

調整數據結構

減少內存消耗的第一種方法是避免添加開銷的 Java 功能,例如基於指針的數據結構和包裝對象。有幾種方法可以做到這一點:

  1. 將數據結構設計為偏好對象數組和原始類型,而不是標準的 Java 或 Scala 集合類(例如: HashMap )。該 fastutil 庫提供方便的集合類基本類型是與 Java 標準庫兼容。
  2. 盡可能避免使用很多小對象和指針的嵌套結構。
  3. 考慮使用數字 ID 或枚舉對象而不是鍵的字符串。
  4. 如果您的 RAM 小於32 GB,請設置 JVM 標誌 -XX:+UseCompressedOops ,使指針為4個字節而不是8個字節。您可以添加這些選項 spark-env.sh

序列化 RDD 存儲

當您的對象仍然太大而無法有效存儲,盡管這種調整,減少內存使用的一個更簡單的方法是以序列化形式存儲它們,使用 RDD 持久性 API 中的序列化 StorageLevel ,例如: MEMORY_ONLY_SER 。 Spark 將會將每個 RDD 分區存儲為一個大字節數組。以序列化形式存儲數據的唯一缺點是訪問時間較短,因為必須對每個對象進行反序列化。如果您想以序列化形式緩存數據,我們強烈建議使用 Kryo ,因為它導致比 Java 序列化更小的尺寸(而且肯定比原 Java 對象)更小。

垃圾收集調整

當您的程序存儲的 RDD 有很大的”流失”時, JVM 垃圾收集可能是一個問題。(程序中通常沒有問題,只讀一次 RDD ,然後在其上運行許多操作)。 當 Java 需要驅逐舊對象為新的對象騰出空間時,需要跟蹤所有 Java 對象並找到未使用的。要記住的要點是,垃圾收集的成本與 Java 對象的數量成正比,因此使用較少對象的數據結構(例如: Ints數組,而不是 LinkedList )大大降低了此成本。 一個更好的方法是如上所述以序列化形式持久化對象:現在每個 RDD 分區只有一個對象(一個字節數組)。 在嘗試其他技術之前,如果 GC 是一個問題,首先要使用序列化緩存。

由於任務的工作記憶(運行任務所需的空間)和緩存在節點上的 RDD 之間的幹擾, GC 也可能是一個問題。我們將討論如何控制分配給RDD緩存的空間來減輕這一點。

測量 GC 的影響

GC 調整的第一步是收集關於垃圾收集發生頻率和GC花費的時間的統計信息。這可以通過添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 到 Java 選項來完成。(有關將 Java 選項傳遞給 Spark 作業的信息,請參閱配置指南)下次運行 Spark 作業時,每當發生垃圾回收時,都會看到在工作日誌中打印的消息。請註意,這些日誌將在您的群集的工作節點上( stdout 在其工作目錄中的文件中),而不是您的驅動程序。

高級 GC 優化

為了進一步調整垃圾收集,我們首先需要了解一些關於 JVM 內存管理的基本信息:

  • Java堆空間分為兩個區域 Young 和 Old 。 Young 一代的目的是持有短命的物體,而 Old 一代的目標是使用壽命更長的物體。

  • Young 一代進一步分為三個區域[ Eden , Survivor1 , Survivor2 ]。

  • 垃圾收集過程的簡化說明:當 Eden 已滿時, Eden 上運行了一個小型 GC ,並將 Eden 和 Survivor1 中存在的對象復制到 Survivor2 。幸存者地區被交換。如果一個對象足夠老,或者 Survivor2 已滿,則會移動到 Old 。最後,當 Old 接近滿時,一個完整的 GC 被調用。

Spark 中 GC 調優的目的是確保只有長壽命的 RDD 存儲在 Old 版本中,並且 Young 版本的大小足夠存儲短命期的對象。這將有助於避免使用完整的 GC 來收集任務執行期間創建的臨時對象。可能有用的一些步驟是:

  • 通過收集 GC 統計信息來檢查垃圾收集是否太多。如果在任務完成之前多次調用完整的 GC ,這意味著沒有足夠的可用於執行任務的內存。

  • 如果太小的集合太多,而不是很多主要的 GC ,為 Eden 分配更多的內存將會有所幫助。您可以將 Eden 的大小設置為對每個任務需要多少內存的估計。如果確定 Eden 的大小 E ,那麽您可以使用該選項設置年輕一代的大小 -Xmn=4/3*E 。(按比例增加4/3是考慮幸存者地區使用的空間。)

  • 在打印的 GC 統計信息中,如果 OldGen 接近於滿,則通過降低減少用於緩存的內存量 spark.memory.fraction; 緩存較少的對象比減慢任務執行更好。或者,考慮減少年輕一代的大小。這意味著 -Xmn 如果您將其設置為如上所述降低。如果沒有,請嘗試更改 JVM NewRatio 參數的值。許多 JVM 默認為2,這意味著 Old 版本占據堆棧的2/3。它應該足夠大,使得該分數超過 spark.memory.fraction

  • 嘗試使用 G1GC 垃圾回收器 -XX:+UseG1GC。在垃圾收集是瓶頸的一些情況下,它可以提高性能. 請註意,對於大型 excutor 的堆大小,通過設置 -XX:G1HeapRegionSize 參數來增加 G1 區域的大小 是非常重要的

  • 例如,如果您的任務是從 HDFS 讀取數據,則可以使用從 HDFS 讀取的數據塊的大小來估計任務使用的內存量。請註意,解壓縮塊的大小通常是塊大小的2或3倍。所以如果我們希望有3或4個任務的工作空間,而 HDFS 塊的大小是128MB,我們可以估計 Eden 的大小4*3*128MB

  • 監控垃圾收集的頻率和時間如何隨著新設置的變化而變化。

我們的經驗表明, GC 調整的效果取決於您的應用程序和可用的內存量。有更多的優化選項 在線描述,但在較高的水平,管理完整的 GC 如何經常發生可以減少開銷幫助。

可以通過spark.executor.extraJavaOptions在作業的配置中設置來指定執行器的 GC 調整標誌。

其他註意事項

並行度水平

集群不會被充分利用,除非您將每個操作的並行級別設置得足夠高。自動星火設置的 “地圖” 任務的數量根據其大小對每個文件運行(盡管你可以通過可選的參數來控制它 SparkContext.textFile ,等等),以及用於分布式”減少”操作,如: groupByKeyreduceByKey ,它采用了最大父 RDD 的分區數。您可以將並行級別作為第二個參數傳遞(請參閱 spark.PairRDDFunctions 文檔),或者將 config 屬性設置 spark.default.parallelism 為更改默認值。一般來說,我們建議您的群集中每個 CPU 內核有2-3個任務。

減少任務的內存使用

有時,您將得到一個 OutOfMemoryError ,因為您的 RDD 不適合內存,而是因為您的其中一個任務的工作集(如其中一個 reduce 任務groupByKey)太大。 Spark 的 shuffle 操作(sortByKeygroupByKeyreduceByKeyjoin,等)建立每個任務中的哈希表來進行分組,而這往往是很大的。這裏最簡單的解決方案是增加並行級別,以便每個任務的輸入集都更小。 Spark 可以有效地支持短達200 ms 的任務,因為它可以將多個任務中的一個執行者JVM重用,並且任務啟動成本低,因此您可以將並行級別安全地提高到集群中的核心數量。

廣播大的變量

使用 可用的廣播功能 SparkContext 可以大大減少每個序列化任務的大小,以及在群集上啟動作業的成本。如果您的任務使用其中的驅動程序中的任何大對象(例如:靜態查找表),請考慮將其變為廣播變量。 Spark 打印主機上每個任務的序列化大小,因此您可以查看該任務以決定您的任務是否過大; 一般任務大於20 KB大概值得優化。

數據本地化

數據本地化可能會對 Spark job 的性能產生重大影響。如果數據和在其上操作的代碼在一起,則計算往往是快速的。但如果代碼和數據分開,則必須移動到另一個。通常,代碼大小遠小於數據,因此將數據代碼從一個地方寄送到另一個地方比一大塊數據更快。 Spark 圍繞數據局部性的一般原則構建其調度。

數據本地化是指數據和代碼處理有多近。根據數據的當前位置有幾個地方級別。從最近到最遠的順序:

  • PROCESS_LOCAL 數據與運行代碼在同一個 JVM 中。這是可能的最好的地方
  • NODE_LOCAL 數據在同一個節點上。示例可能在同一節點上的 HDFS 或同一節點上的另一個執行程序中。這比 PROCESS_LOCAL 因為數據必須在進程之間移動慢一些
  • NO_PREF 數據從任何地方同樣快速訪問,並且沒有本地偏好
  • RACK_LOCAL 數據位於同一機架上的服務器上。數據位於同一機架上的不同服務器上,因此需要通過網絡發送,通常通過單個交換機發送
  • ANY 數據在網絡上的其他地方,而不在同一個機架中

Spark 喜歡將所有 task 安排在最佳的本地級別,但這並不總是可能的。在任何空閑 executor 中沒有未處理數據的情況下, Spark 將切換到較低的本地級別。有兩個選項: a )等待一個繁忙的 CPU 釋放在相同服務器上的數據上啟動任務,或者 b )立即在更遠的地方啟動一個新的任務,需要在那裏移動數據。

Spark 通常做的是等待一個繁忙的 CPU 釋放的希望。一旦超時,它將開始將數據從遠處移動到可用的 CPU 。每個級別之間的回退等待超時可以在一個參數中單獨配置或全部配置; 有關詳細信息,請參閱配置頁面 spark.locality 上的 參數。如果您的 task 很長,並且本地化差,您應該增加這些設置,但默認值通常會很好。

概要

這是一個簡短的指南,指出調整 Spark 應用程序時應該了解的主要問題 - 最重要的是數據序列化和內存調整。對於大多數程序,以序列化形式切換到 Kryo 序列化和持久化數據將會解決大多數常見的性能問題。隨時在 Spark 郵件列表中詢問有關其他調優最佳做法的信息。

一分鐘了解spark的調優