1. 程式人生 > >spark 原始碼分析之二十二-- Task的記憶體管理

spark 原始碼分析之二十二-- Task的記憶體管理

問題的提出

本篇文章將回答如下問題:

1.  spark任務在執行的時候,其記憶體是如何管理的?

2. 堆內記憶體的定址是如何設計的?是如何避免由於JVM的GC的存在引起的記憶體地址變化的?其內部的記憶體快取池回收機制是如何設計的?

3. 堆外和堆內記憶體分別是通過什麼來分配的?其資料的偏移量是如何計算的?

4. 消費者MemoryConsumer是什麼?

5. 資料在記憶體頁中是如何定址的?

 

單個任務的記憶體管理是由 org.apache.spark.memory.TaskMemoryManager 來管理的。

TaskMemoryManager

它主要是負責管理單個任務的記憶體。

首先記憶體分為堆外記憶體和堆內記憶體。

對於堆外記憶體,可以記憶體地址直接使用64位長整型地址定址。

對於堆內記憶體,記憶體地址由一個base物件和一個offset物件組合起來表示。

類在設計的過程中遇到的問題:

對於其他結構內部的結構的地址的儲存是存在問題的,比如在hashmap或者是 sorting buffer 中的記錄的指標,儘管我們決定使用128位來定址,我們不能只存base物件的地址,因為由於gc的存在,這個地址不能保證是穩定不變的。(由於分代回收機制的存在,記憶體中的物件會不斷移動,每次移動,物件記憶體地址都會改變,但這對於不關注物件地址的開發者來說,是透明的)

最終的方案:

對於堆外記憶體,只儲存其原始地址,因為堆外記憶體不受gc影響;對於堆內記憶體,我們使用64位的高13位來儲存記憶體頁數,低51位來儲存這個頁中的offset,使用page表來儲存base物件,其在page表中的索引就是該記憶體的記憶體頁數。頁數最多有8192頁,理論上允許索引 8192 * (2^31 -1)* 8 bytes,相當於140TB的資料。其中 2^31 -1 是整數的最大值,因為page表中記錄索引的是一個long型陣列,這個陣列的最大長度是2^31 -1。實際上沒有那麼大。因為64位中除了用來設計頁數和頁內偏移量外還用於存放資料的分割槽資訊。

MemoryLocation

其中這個base物件和offset物件被封裝進了 MemoryLocation物件中,也就是說,這個類就是用來記憶體定址的,如下:

其唯一實現類為 org.apache.spark.unsafe.memory.MemoryBlock。

MemoryBlock

它表示一段連續的記憶體塊,包括一個起始位置和一個固定大小。起始位置有MemoryLocation來表示。

也就是說它有四個屬性:

這段連續記憶體塊的起始地址:從父類繼承而來的base物件和offset。

固定大小 length以及對這個記憶體塊的唯一標識 - 記憶體頁碼(page number)

 

主要方法如下,其中Platform是跟作業系統有關的一個類,不做過多說明。

MemoryAllocator

其主要負責記憶體的申請工作。這個介面的實現類是真正分配記憶體的。後面介紹的TaskMemoryManager只是負責管理記憶體,但是不負責具體的記憶體分配事宜。

其繼承關係如下,有兩個子類:

其定義的主要的常量和方法如下:

主要方法主要用來分配和釋放記憶體塊。下面主要來看一下它兩個子類的實現。

HeapMemoryAllocator

全稱:org.apache.spark.unsafe.memory.HeapMemoryAllocator

主要負責分配堆內記憶體,其主要分配long型陣列,最大分配記憶體為16GB。

成員變數

bufferPoolBySize是一個HashMap,其內部的value裡面存放的資料都是弱引用型別的資料,在JVM 發生GC時,資料可能會被回收。它裡面存放的資料都是已經不用的廢棄掉的記憶體塊。

是否使用記憶體快取池

申請的記憶體塊的大小大於閥值才使用記憶體快取池。

分配記憶體

思路:首先根據bytes大小計算處words的大小,然後位元組對齊計算出對齊需要的位元組,斷言對齊後的位元組大小大於等於之前未對齊的位元組大小。為什麼要對齊呢?因為長整型陣列的記憶體大小是對齊的。

如果對齊後的位元組大小滿足使用快取池的條件,則先從快取池中彈出對應的pool,並且如果彈出的pool不為空,則逐一取出之前釋放的陣列,並將其封裝進MmeoryBlock物件,並且使用標誌位清空之前的歷史資料返回之。

否則,則初始化指定的words長度的長整型陣列,並將其封裝進MmeoryBlock物件,並且使用標誌位清空之前的歷史資料返回之。總之快取的是長整型陣列,存放資料的也是長整型陣列。

釋放記憶體

 

首先把要釋放的記憶體資料使用free標誌位覆蓋,pageNumber置為佔位的page number。

然後取出其內部的長整型陣列賦值給臨時變數,並且把base物件置為null,offset置為0。

取出的長整型陣列計算其對齊大小,記憶體頁的大小不一定等於陣列的長度 * 8,此時的size是記憶體頁的大小,需要進行對齊操作。

對齊之後的記憶體頁大小如果滿足快取池條件,則將其暫存快取池,等待下次回收再用或者JVM的GC回收。

這個方法結束之後,這個長整型陣列被LinkedList物件(即pool)引用,但這是一個若引用,所以說,現在這個陣列是一個遊離物件,當JVM回收時,會回收它。

對堆內記憶體的總結

對於堆內記憶體上的資料真實受JVM的GC影響,其真實資料的記憶體地址會發生改變,巧妙使用陣列這種容器以及偏移量巧妙地將這個問題規避了,資料回收也可以使用快取池機制來減少陣列頻繁初始化帶來的開銷。其內部使用虛引用來引用釋放的陣列,也不會導致無法回收導致記憶體洩漏。

UnsafeMemoryAllocator

全稱:org.apache.spark.unsafe.memory.UnsafeMemoryAllocator

負責分配堆外記憶體。

分配記憶體

思路:底層使用unsafe這個類來分配堆外記憶體。這裡的offset就是作業系統的記憶體地址,base物件為null。

釋放記憶體

堆外記憶體的釋放不能使用快取池,因為堆外記憶體不受JVM的管理,將會導致遺留的不用的記憶體無法回收從而引發更嚴重的記憶體洩漏,更甚者堆外記憶體使用的是系統記憶體,嚴重的話還會導致出現系統級問題。

堆堆外記憶體的總結

簡言之,對於堆外記憶體的分配和回收,都是通過java內建的Unsafe類來實現的,其統一規範中的base物件為null,其offset就是該記憶體頁在作業系統中的真實地址。

 

下面剖析一下TaskMemoryManager的成員變數和核心方法。

進一步剖析TaskMemoryManager

成員變數

下面,先來看一下其成員變數,截圖如下:

對主要的成員變數做如下解釋:

OFFSET_BITS:是指的page number 佔用的bit個數

MAXIMUM_PAGE_SIZE_BYTES:約17GB,每頁最大可存記憶體大小

pageTable:主要用來存放記憶體頁的

allocatedPages:主要用來追蹤記憶體頁是否為空的

memoryManager:主要負責Spark記憶體管理,具體細節可以參照 spark 原始碼分析之十五 -- Spark記憶體管理剖析 做進一步瞭解。

taskAttemptId:任務id

tungstenMemoryMode:tungsten記憶體模式,是堆外記憶體還是堆內記憶體

consumers:記錄了任務記憶體的所有消費者

核心方法

所有方法如下:

下面,我們來逐一對其進行原始碼剖析。

1. 獲取執行記憶體

思路:首先先去MemoryManager中去申請執行記憶體,如果記憶體不夠,則獲取所有的MemoryConsumer,呼叫其spill方法將記憶體資料溢位到磁碟,直到釋放記憶體空間滿足申請的記憶體空間則停止spill操作。

2. 釋放執行記憶體

這其實不是真正意義上的記憶體釋放,只是管賬的把這筆記憶體佔用劃掉了,真正的記憶體釋放還是需要呼叫MemoryConsumer的spill方法將記憶體資料溢位到磁碟來釋放記憶體。

3. 獲取記憶體頁大小

 

4. 分配記憶體頁

思路:首先獲取執行記憶體。執行記憶體獲取成功後,找到一個空的記憶體頁。

如果記憶體頁碼大於指定的最大頁碼,則釋放剛申請的記憶體,返回;否則使用MemoryAllocator分配記憶體頁、初始化記憶體頁碼並將其放入page表的管理,最後返回page。關於MemoryAllocator分配記憶體的細節,請參照上文關於其堆內記憶體或堆外記憶體的記憶體分配的詳細剖析。

 

5. 釋放記憶體頁

思路:首先呼叫EMmoryAllocator的free 方法來釋放記憶體,並且呼叫 方法2 來劃掉記憶體的佔用情況。

 

6. 記憶體地址加密

思路:高13位儲存的是page number,低51位儲存的是地址的offset

 

7.記憶體地址解密

思路: 跟 方法6 的編碼思路相反

 

8.根據記憶體地址獲取記憶體的base物件,前提是必須是堆內記憶體頁,否則沒有base物件。

 

9.獲取記憶體地址在記憶體頁的偏移量offset

如果是堆內記憶體,則直接返回其解碼之後的offset即可。

如果是堆外記憶體,分配記憶體時的offset + 頁內的偏移量就是真正的偏移量,是針對作業系統的,也是絕對的偏移量。

 

10.清空所有記憶體頁

思路:使用MemoryAllocator釋放記憶體,並且請求管賬的MemoryManager釋放執行記憶體和task的所有記憶體。

 

11.獲取單個任務的執行記憶體使用情況

思路:從MemoryManager處獲取指定任務的執行記憶體使用情況。

 

下面看一下跟TaskMemoryManager互動的消費者物件 -- MemoryConsumer。

MemoryConsumer

類說明

全稱:org.apache.spark.memory.MemoryConsumer

它是任務記憶體的消費者。

其類結構如下:

成員變數

taskMemoryManager:是負責任務記憶體管理。

used:表示使用的記憶體。

mode:表示記憶體的模式是堆內記憶體還是堆外記憶體。

pageSize:表示頁大小。

主要方法

1. 記憶體資料溢位到磁碟,抽象方法,等待子類實現。

 

2. 申請釋放記憶體部分,不再做詳細的分析,都是依賴於 TaskMemoryManager 做的操作。

關於更多MemoryConsumer的以及其子類的相關內容,將在下一篇文章Shuffle的寫操作中詳細剖析。

 

總結

本篇文章主要剖析了Task在任務執行時記憶體的管理相關的內容,現在可能還看不出其重要性,後面在含有sort的shuffle過程中,會頻繁的使用基於記憶體的sorter,此時的sorter包含大量的資料,是需要記憶體管理