1. 程式人生 > >每次進步一點點——spark中cache和persist的區別

每次進步一點點——spark中cache和persist的區別

昨天面試被問到了cache和persist區別,當時只記得是其中一個呼叫了另一個,但沒有回答出二者的不同,所以回來後重新看了原始碼,算是弄清楚它們的區別了。

cache和persist都是用於將一個RDD進行快取的,這樣在之後使用的過程中就不需要重新計算了,可以大大節省程式執行時間。

cache和persist的區別

基於Spark 1.4.1 的原始碼,可以看到

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()

說明是cache()呼叫了persist(), 想要知道二者的不同還需要看一下persist函式:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

可以看到persist()內部呼叫了persist(StorageLevel.MEMORY_ONLY),繼續深入:

/**
 * Set this RDD's storage level to persist its values across operations after the first time
 * it is computed. This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet..
 */
def persist(newLevel: StorageLevel): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  sc.persistRDD(this)
  // Register the RDD with the ContextCleaner for automatic GC-based cleanup
  sc.cleaner.foreach(_.registerRDDForCleanup(this))
  storageLevel = newLevel
  this
}

可以看出來persist有一個 StorageLevel 型別的引數,這個表示的是RDD的快取級別。

至此便可得出cache和persist的區別了:cache只有一個預設的快取級別MEMORY_ONLY ,而persist可以根據情況設定其它的快取級別。

RDD的快取級別

順便看一下RDD都有哪些快取級別,檢視 StorageLevel 類的原始碼:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}

可以看到這裡列出了12種快取級別,但這些有什麼區別呢?可以看到每個快取級別後面都跟了一個StorageLevel的建構函式,裡面包含了4個或5個引數,如下

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

檢視其建構函式

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}

可以看到StorageLevel類的主構造器包含了5個引數:

  • useDisk:使用硬碟(外存)
  • useMemory:使用記憶體
  • useOffHeap:使用堆外記憶體,這是Java虛擬機器裡面的概念,堆外記憶體意味著把記憶體物件分配在Java虛擬機器的堆以外的記憶體,這些記憶體直接受作業系統管理(而不是虛擬機器)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。
  • deserialized:反序列化,其逆過程式列化(Serialization)是java提供的一種機制,將物件表示成一連串的位元組;而反序列化就表示將位元組恢復為物件的過程。序列化是物件永久化的一種機制,可以將物件及其屬性儲存起來,並能在反序列化後直接恢復這個物件
  • replication:備份數(在多個節點上備份)

理解了這5個引數,StorageLevel 的12種快取級別就不難理解了。

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用這種快取級別的RDD將儲存在硬碟以及記憶體中,使用序列化(在硬碟中),並且在多個節點上備份2份(正常的RDD只有一份)

另外還注意到有一種特殊的快取級別

val OFF_HEAP = new StorageLevel(false, false, true, false)

使用了堆外記憶體,StorageLevel 類的原始碼中有一段程式碼可以看出這個的特殊性,它不能和其它幾個引數共存。

if (useOffHeap) {
  require(!useDisk, "Off-heap storage level does not support using disk")
  require(!useMemory, "Off-heap storage level does not support using heap memory")
  require(!deserialized, "Off-heap storage level does not support deserialized storage")
  require(replication == 1, "Off-heap storage level does not support multiple replication")
}