1. 程式人生 > >spark2原理分析-RDD的caching和persistence原理分析

spark2原理分析-RDD的caching和persistence原理分析

概述

本文分析RDD的caching和persistence的原理。並對其程式碼實現進行分析。

persist and cache

基本概念

Spark的一個重要特性是:能夠跨操作把資料儲存到記憶體中,這個過程稱為persist,或稱為caching。當persist一個RDD時,每個spark節點都會把它計算的任何分割槽儲存到記憶體中,當在基於這些資料進行其他操作時進行復用。這樣使得將來進行的操作更快(通常是數10倍)。Caching是演算法迭代(iterative algorithms)和快速互動式(fast interactive)使用的關鍵工具。

您可以使用persist()或cache()方法來標記要保留的RDD。在第一次操作計算完成後,它將保留在該計算節點的記憶體中。 Spark的快取具有容錯性:若任何RDD的分割槽丟失,它將使用最初建立它的轉換(transformations)自動重新計算。

此外,每次persist RDD可以使用不同的儲存級別進行儲存,例如,您可以將資料集儲存在磁碟上,將其保留在記憶體中,但作為序列化Java物件(以節省空間),跨節點複製它。

通過將StorageLevel物件(Scala,Java,Python)傳遞給persist()來設定這些儲存級別。cache()方法是使用預設儲存級別的簡寫,即StorageLevel.MEMORY_ONLY(在記憶體中儲存反序列化的物件)。完整的儲存級別是:

Storage Level 說明
MEMORY_ONLY 將RDD儲存為JVM中的反序列化Java物件。如果RDD不儲存在記憶體,則某些分割槽將不會被快取,並且每次需要時都會重新計算。這是預設級別。
MEMORY_AND_DISK 將RDD儲存為JVM中的反序列化Java物件。若RDD不適合儲存在記憶體中,則可以儲存在磁碟中,需要時可以從磁碟讀取。
MEMORY_ONLY_SER(java and Scala) 將RDD儲存為序列化Java物件(每個分割槽一個位元組陣列)。這通常比反序列化物件更節省空間,特別是在使用快速序列化器時,但讀取CPU密集程度更高。
MEMORY_AND_DISK_SER(java and Scala) 與MEMORY_ONLY_SER類似,但將不適合記憶體的分割槽溢位到磁碟,而不是每次需要時即時重新計算它們。
DISK_ONLY 僅將RDD分割槽儲存在磁碟上。
MEMORY_ONLY_2 與上面的級別相同,但複製兩個群集節點上的每個分割槽。
OFF_HEAP(測試中) 與MEMORY_ONLY_SER類似,但將資料儲存在堆外記憶體中。這需要啟用堆外記憶體。

Spark會在shuffe操作時自動快取(persist)一些資料,即使使用者不呼叫persist函式。這樣做是為了避免在shuffle期間節點出現故障時重新計算整個輸入。若使用者需要重用結果RDD,任然建議在得到RDD結果時呼叫persist函式。

如何選擇儲存級別(Storage Level)

Spark的儲存級別目的是:在提供記憶體使用和CPU效率之間的選擇的權衡。我們建議您通過以下流程選擇一個:

  • 若RDD適合使用預設儲存級別(MEMORY_ONLY),就儘量使用。這是使CPU效率最高的選項,允許RDD上的操作儘可能快地執行。
  • 若使用預設儲存級別不太合適,可以嘗試使用MEMORY_ONLY_SER並選擇一個快速序列化庫,以使物件更加節省空間,但仍然可以快速訪問。(Java和Scala)
  • 若計算資料集的函式不是很消耗資源,或則這些函式過濾了資料集中大量的資料,不要把資料儲存在磁碟。另外,重新計算分割槽可能與從磁碟讀取分割槽一樣快。
  • 如果要快速的進行故障恢復,請使用複製的儲存級別( replicated storage levels)(例如,如果使用Spark來處理來自Web應用程式的請求)。所有儲存級別都會通過重新計算丟失的資料提供完全容錯,但複製的儲存級別允許您繼續在RDD上執行任務,而無需等待重新計算丟失的分割槽。

Persist and Cache原理分析

Persisting一個RDD意味著物理化RDD(通常通過將其儲存在執行器(executor)的記憶體中),以便在當前作業期間重用。Spark會記住一個RDD的血緣(RDD Lineage)。這樣,若RDD的其中一個persisting分割槽丟失,就可以繼續spark任務來重新計算它。作業結束後,persist函式接受一個StorageLevel引數,該引數指定應如何儲存RDD。

Spark提供了許多不同的儲存級別作為常量,但每個儲存級別(storage level)都是基於如何儲存RDD的五個屬性建立的:useDisk,useMemory,useOfHeap,deserialized和replication。在儲存級別呼叫toString將顯示它包含的選項。在Spark的文件中有一個關於persist的現有儲存選項的列表,如上一節所示。

在StorageLevel類中對以上的各種Storagelevel進行了封裝。通過該物件,來判斷是否要將RDD寫入到外部儲存,是否需要進行序列化,是否需要再多個節點中複製RDD的副本。

useDisk

包含DISK的儲存級別標誌(例如MEMORY_AND_DISK)啟用此功能。

預設情況下,如果分割槽資料不適合在記憶體,它們將被寫出到磁碟,並且在使用persist RDD時需要重新計算。

持久化到磁碟可以確保避免重新計算那些額外的大分割槽。但是,從磁碟讀取資料是時間密集的操作,因此如果重新計算的成本特別高,則對磁碟的永續性才顯得更加重要。

如果您希望RDD不儲存在記憶體,則允許寫入磁碟可能會有所幫助。但是,如果重新計算分割槽的成本不高(它們是簡單的對映並且不減小資料的大小),重新計算某些分割槽而不是從磁碟讀取實際上可能會更快。

useMemory

若設定了該變數,RDD會直接寫入到記憶體中,否則會寫入磁碟。

若設定了DISK_ONLY則該選項被設定為false。快取的大多數速度優勢來自於將RDD保留在記憶體中,因此如果重用是為了重複計算的快速訪問,那麼選擇在記憶體中儲存分割槽的儲存選項可能是個好主意。 然而,在某些情況下,僅disk-only persistence是有意義的,例如,當計算比在本地磁碟中讀取更昂貴或者網路檔案系統特別慢時(例如對於某些物件儲存)。

useOfHeap

若設定了該選項,則RDD會儲存在執行器外部的儲存系統中。

可以通過儲存選項off_heap來啟用該屬性。如果記憶體是一個嚴重的問題,或者群集有噪聲並且有分割槽被寫入磁碟,則此選項可能很有用。

deserialized

若設定了該選項,RDD會被當成java的序列化物件進行儲存。

通過該選項可以讓儲存的RDD更節約空間。

如果您的RDD太大而無法在記憶體中保留,請首先嚐試使用MEMORY_ONLY_SER選項對其進行序列化。這將使RDD快速訪問,但會減少儲存它所需的記憶體。

replication

replication(副本數)變數是一個整數,用於控制要儲存在群集中的持久資料的副本數。

預設情況下,此值設定為1; 但是,以_2結尾的序列化選項(如DISK_ONLY_2)會跨兩個節點複製每個分割槽。 使用此選項可確保更快的容錯能力。 但是,請注意,複製永續性會導致無需複製的永續性空間和速度成本增加一倍。 複製通常僅在嘈雜群集或錯誤連線的例項中是必需的,其中異常可能發生故障。 如果您在發生故障時沒有時間重新計算,例如在提供實時Web應用程式時,它也可能很有用。

persist 和 cache 實現分析

persist()函式的實現

在類RDD中,對RDD進行快取的函式定義如下:

/**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

可以看到,若直接呼叫persist()或cache()函式,預設的StorageLevel是MEMORY_ONLY。也就是預設是儲存在記憶體中。

有的可以指定快取RDD的StorageLevel,函式定義如下:

  def persist(newLevel: StorageLevel): this.type 

最終各個函式都會呼叫下面的函式來實現其功能:

  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type 

該函式會把RDD的storageLevel變數設定為新的level,如下:

    storageLevel = newLevel

unpersist

該函式的實現如下:

  def unpersist(blocking: Boolean = true): this.type = {
    logInfo("Removing RDD " + id + " from persistence list")
    sc.unpersistRDD(id, blocking)
    storageLevel = StorageLevel.NONE
    this
  }

該函式會呼叫SparkContext的unpersistRDD函式,從所有節點的記憶體和磁碟上刪除指定RDD的資料。並把該RDD的StorageLevel設定為NONE。

總結

本文介紹了Spark的persist/cache的原理,並對其實現進行了分析。