1. 程式人生 > >spark 原始碼分析之十七 -- Spark磁碟儲存剖析

spark 原始碼分析之十七 -- Spark磁碟儲存剖析

上篇文章 spark 原始碼分析之十六 -- Spark記憶體儲存剖析 主要剖析了Spark 的記憶體儲存。本篇文章主要剖析磁碟儲存。

總述

磁碟儲存相對比較簡單,相關的類關係圖如下:

 

 

我們先從依賴類 DiskBlockManager 剖析。

 

DiskBlockManager

文件說明如下:

Creates and maintains the logical mapping between logical blocks and physical on-disk locations. 
One block is mapped to one file with a name given by its BlockId.
Block files are hashed among the directories listed in spark.local.dir (or in SPARK_LOCAL_DIRS, if it's set).

建立並維護邏輯block和block落地的物理檔案的對映關係。一個邏輯block通過它的BlockId的name屬性對映到具體的檔案。

 

類結構

其類結構如下:

可以看出,這個類主要用於建立並維護邏輯block和block落地檔案的對映關係。儲存對映關係,有兩個解決方案:一者是使用Map儲存每一條具體的對映鍵值對,二者是指定對映函式像分割槽函式等等,給定的key通過對映函式對映到具體的value。

成員變數

成員變數如下:

subDirsPerLocalDir:這個變量表示本地檔案下有幾個檔案,預設為64,根據引數 spark.diskStore.subDirectories 來調節。

subDirs:是一個二維陣列表示本地目錄和子目錄名稱的組合關係,即 ${本地目錄1 ... 本地目錄n}/${子目錄1 ... 子目錄64}

localDirs:表示block落地本地檔案根目錄,通過 createLocalDirs 方法獲取,方法如下:

思路:它先呼叫呼叫Utils的 getConfiguredLocalDirs 方法,獲取到配置的目錄集合,然後map每一個父目錄,呼叫Utils的createDirectory方法,在每一個子目錄下建立一個 以blockmgr 為字首的目錄。其依賴方法 createDirectory 如下:

這個方法允許重試次數為10,目的是為了防止建立的目錄跟已存在的目錄重名。

 

getConfiguredLocalDirs 方法如下:

大多數生產情況下,都是使用yarn,我們直接看一下spark on yarn 環境下,目錄到底在哪裡。直接來看getYarnLocalDirs方法:

LOCAL_DIRS的定義是什麼?

任務是跑在yarn 上的,下面就去定位一下hadoop yarn container的相關原始碼。

定位LOCAL_DIRS環境變數

在ContainerLaunch類的 sanitizeEnv 方法中,找到了如下語句:

 

addToMap 方法如下:

即,資料被新增到了envirment map變數和 nmVars set集合中了。

在ContainerLaunch 的 call 方法中呼叫了 sanitizeEnv 方法:

appDirs變數定義如下:

即每一個 appDir格式如下:${localDir}/usercache/${user}/appcache/${application-id}/

localDirs 定義如下:

dirHandler是一個 LocalDirsHandlerService 型別變數,這是一個服務,在其serviceInit方法中,例項化了 MonitoringTimerTask物件:

在 MonitoringTimerTask 構造方法中,發現了:

 NM_LOCAL_DIRS 常量定義如下:

 

即:yarn.nodemanager.local-dirs 引數,該引數定義在yarn-default.xml下。

即localDir如下:

${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/${application-id}/

再結合createDirectory方法,磁碟儲存的本地目錄是:

 ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/${application-id}/blockmgr-隨機的uuid/

核心方法

根據檔案內容建立File物件,如下:

思路:先根據filename即blockId的name欄位生成正的hashcode(abs(hashcode))

dirId 是指的第幾個父目錄(從0開始數),subDirId是指的父目錄下的第幾個子目錄(從0開始數)。最後拼接父子目錄為一個新的父目錄subDir。

然後以subDir為父目錄,建立File物件,並返回之。

跟getFile 方法相關的方法如下:

比較簡單,不做過多說明。

 

建立一個臨時Block,包括臨時本地block 或 shuffle block,如下:

 

還有一個方法,是停止 DiskBlockManager之後的回撥方法:

若deleteFilesOnStop 為 true,即DiskBlockManager停止時,是否需要清除本地儲存的block檔案。

在 BlockManager 中初始化DiskBlockManager時,deleteFilesOnStop 通過構造方法傳入

 

總結:DiskBlockManager 是用來建立並維護邏輯block和落地後的block檔案的對映關係的,它還負責建立用於shuffle或本地的臨時檔案。

 下面看一下在DiskStore中可能會用到的類以及其相關類的說明。

CountingWritableChannel 

它主要對sink做了包裝,在寫入sink的同時,還記錄向sink寫的資料的總量。原始碼如下:

程式碼比較簡單,不做過多說明。

ManagedBuffer

類說明如下:

This interface provides an immutable view for data in the form of bytes. 
The implementation should specify how the data is provided:
- FileSegmentManagedBuffer: data backed by part of a file
- NioManagedBuffer: data backed by a NIO ByteBuffer
- NettyManagedBuffer: data backed by a Netty ByteBuf
The concrete buffer implementation might be managed outside the JVM garbage collector.
For example, in the case of NettyManagedBuffer, the buffers are reference counted.
In that case, if the buffer is going to be passed around to a different thread, retain/release should be called.

 

類結構如下:

 

EncryptedManagedBuffer

它是一個介面卡,它將幾乎所以轉換的請求委託給了 blockData,下面來看一下這個類相關的剖析。

首先先看一下它的父類 -- BlockData

 

BlockData

介面說明如下:

它是一個介面,它定義了儲存方式以及如何提供不同的方式來讀去底層的block 資料。

定義方法如下:

方法說明如下:

toInputStream用於返回用於讀取該檔案的輸入流。

toNetty用於返回netty對block資料的包裝類,方便netty包來讀取資料。

toChunkedByteBuffer用於將block包裝成ChunkedByteBuffer。

toByteBuffer 用於將block資料轉換為記憶體中直接讀取的 ByteBuffer 物件。

當對該block的操作執行完畢後,需要呼叫dispose來做後續的收尾工作。

size表示block檔案的大小。

它有三個子類:DiskBlockData、EncryptedBlockData和ByteBufferBlockData。

即block的三種存在形式:磁碟、加密後的block和記憶體中的ByteBuffer

分別介紹如下:

 

DiskBlockData

該類主要用於將磁碟中的block檔案轉換為指定的流或物件。

先來看其簡單的方法實現:

構造方法:

相關欄位說明如下:

minMemoryMapBytes表示 磁碟block對映到記憶體塊中最小大小,預設為2MB,可以通過 spark.storage.memoryMapThreshold 進行調整。

maxMemoryMapBytes表示 磁碟block對映到記憶體塊中最大大小,預設為(Integer.MAX_VALUE - 15)B,可以通過 spark.storage.memoryMapLimitForTests 進行調整。

對應原始碼如下:

比較簡單的方法如下: 

size方法直接返回block檔案的大小。

dispose空實現。

open是一個私有方法,主要用於獲取讀取該block檔案的FileChannel物件。

 

toByteBuffer方法實現如下:

 

Utils的tryWithResource方法如下,它先執行createResource方法,然後執行Function物件的apply方法,最終釋放資源,思路就是 建立資源 --使用資源-- 釋放資源三步曲:

即先獲取讀取block檔案的FileChannel物件,若blockSize 小於 最小的記憶體對映位元組大小,則將channel的資料讀取到buffer中,返回的是HeapByteBuffer物件,即資料被寫入到了堆裡,即它是non-direct buffer,相當於資料被讀取到中間臨時記憶體中,否則使用FileChannelImpl的map方法返回 MappedByteBuffer 物件。

MappedByteBuffer文件說明如下:

A direct byte buffer whose content is a memory-mapped region of a file.
Mapped byte buffers are created via the FileChannel.map method. This class extends the ByteBuffer class with operations that are specific to memory-mapped file regions.
A mapped byte buffer and the file mapping that it represents remain valid until the buffer itself is garbage-collected.
The content of a mapped byte buffer can change at any time, for example if the content of the corresponding region of the mapped file is changed by this program or another. Whether or not such changes occur, and when they occur, is operating-system dependent and therefore unspecified. 
All or part of a mapped byte buffer may become inaccessible at any time, for example if the mapped file is truncated. An attempt to access an inaccessible region of a mapped byte buffer will not change the buffer's content and will cause an unspecified exception to be thrown either at the time of the access or at some later time. It is therefore strongly recommended that appropriate precautions be taken to avoid the manipulation of a mapped file by this program, or by a concurrently running program, except to read or write the file's content.
Mapped byte buffers otherwise behave no differently than ordinary direct byte buffers.

 

它是direct buffer,即直接從磁碟讀資料,不經過中間臨時記憶體,可以參照ByteBuffer的文件對Direct vs. non-direct buffers 的說明如下:

Direct vs. non-direct buffers
A byte buffer is either direct or non-direct. Given a direct byte buffer, the Java virtual machine will make a best effort to perform native I/O operations directly upon it. That is, it will attempt to avoid copying the buffer's content to (or from) an intermediate buffer before (or after) each invocation of one of the underlying operating system's native I/O operations.
A direct byte buffer may be created by invoking the allocateDirect factory method of this class. The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers. The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.
A direct byte buffer may also be created by mapping a region of a file directly into memory. An implementation of the Java platform may optionally support the creation of direct byte buffers from native code via JNI. If an instance of one of these kinds of buffers refers to an inaccessible region of memory then an attempt to access that region will not change the buffer's content and will cause an unspecified exception to be thrown either at the time of the access or at some later time.
Whether a byte buffer is direct or non-direct may be determined by invoking its isDirect method. This method is provided so that explicit buffer management can be done in performance-critical code. 

 

toChunkedByteBuffer 方法如下:

首先,ChunkedByteBuffer物件裡包含的是資料分成多個小的chunk,而不是連續的陣列。

先把檔案讀到記憶體中的 HeapByteBuffer 物件中即單個chunk,然後放入存放chunk的ListBuffer中,最終轉換為Array存入到ChunkedByteBuffer 物件中。

toNetty實現如下:

DefaultFileRegion說明請繼續向下看,先不做過多說明。

 

EncryptedBlockData

這個類主要是用於加密的block磁碟檔案轉換為特定的流或物件。

構造方法如下:

file指block檔案,blockSize指block檔案大小,key是用於加密的金鑰。

先來看三個比較簡單的方法:

 

open方法不再直接根據FileInputStream獲取其 FileChannelImpl 物件了,而是獲取 FileChannelImpl 之後,再呼叫了 CryptoStreamUtils 的 createReadableChannel 方法,如下:

進一步將channel 物件封裝為 CryptoInputStream 物件,對ErrorHandlingReadableChannel的讀操作,實際上是讀的 CryptoInputStream,這個流內部有一個根據key來初始化的加密器,這個加密器負責對資料的解密操作。

 

toByteBuffer方法如下:

思路:如果block資料大小在整數範圍內,則直接將加密的block解密之後存放在記憶體中。

toChunkedByteBuffer方法除了解密操作外,跟DiskBlockData 中toChunkedByteBuffer方法無異,不做過多說明,程式碼如下:

toNetty 方法,原始碼如下:

ReadableChannelFileRegion類在下文介紹,先不做過多說明。

 

toInputStream方法,原始碼如下:

思路:這個就不能直接open方法返回的獲取inputStream,因為 CryptoInputStream 是沒有獲取inputStream的介面的,Channels.newInputStream返回的是ChannelInputStream,ChannelInputStream對channel做了裝飾。

ByteBufferBlockData

整體比較簡單,主要來看一下dispose方法,ChunkedByteBuffer 方法的 dispose 如下:

即使用StorageUtils的dispose 方法去清理每一個chunk,StorageUtils的dispose 方法如下:

即獲取它的cleaner,然後呼叫cleaner的clean方法。我們以 DirectByteBufferR 為例,做進一步說明:

在其構造方法中初始化Cleaner,如下:

base是呼叫unsafe類的靜態方法allocateMemory分配指定大小記憶體後返回的記憶體地址,size是記憶體大小。

類宣告:

沒錯它是一個虛引用,隨時會被垃圾回收。

 

Cleaner的構造方法如下:

var1 是待清理的物件,var2 是執行清理任務的Runnable物件。

再看它的成員變數:

沒錯,它自己本身就是雙向連結串列上的一個節點,也是雙向連結串列。

 其create 方法如下:

思路:建立cleanr並把它加入到雙向連結串列中。

 

Cleaner的 clean方法如下:

它會先呼叫remove 方法,呼叫成功則執行記憶體清理任務,注意這裡沒有非同步任務同步呼叫Runnable的run方法。

remove 方法如下:

思路:從雙向連結串列中移除指定的cleaner。

Deallocator 類如下:

unsafe的allocateMemory方法使用了off-heap memory,這種方式的記憶體分配不是在堆裡,不受GC的管理,使用Unsafe.freeMemory()來釋放它。

先呼叫 unsafe釋放記憶體,然後呼叫Bits的 unreserveMemory 方法:

至此,dispose 方法結束。

 

 

下面看一下,ReadableChannelFileRegion的繼承關係:

我們按繼承關係來看類: ReferenceCounted --> FileRegion --> AbstractReferenceCounted --> AbstractFileRegion --> ReadableChannelFileRegion。

ReferenceCounted

類說明如下:

A reference-counted object that requires explicit deallocation.
When a new ReferenceCounted is instantiated, it starts with the reference count of 1. 
retain() increases the reference count, and release() decreases the reference count.
If the reference count is decreased to 0, the object will be deallocated explicitly,
and accessing the deallocated object will usually result in an access violation. If an object that implements ReferenceCounted is a container of other objects that implement ReferenceCounted,
the contained objects will also be released via release() when the container's reference count becomes 0.

這是netty包下的一個介面。

它是一個引用計數物件,需要顯示呼叫deallocation。

ReferenceCounted物件例項化時,引用計數設為1,呼叫retain方法增加引用計數,release方法則釋放引用計數。

如果引用計數減少至0,物件會被顯示deallocation,訪問已經deallocation的物件會造成訪問問題。

如果一個物件實現了ReferenceCounted介面的容器包含了其他實現了ReferenceCounted介面的物件,當容器的引用減少為0時,被包含的物件也需要通過 release 方法釋放之,即引用減1。

主要有三類核心方法:

retain:Increases the reference count by 1 or the specified increment.

touch:Records the current access location of this object for debugging purposes. If this object is determined to be leaked, the information recorded by this operation will be provided to you via ResourceLeakDetector. This method is a shortcut to touch(null).

release:Decreases the reference count by 1 and deallocates this object if the reference count reaches at 0. Returns true if and only if the reference count became 0 and this object has been deallocated

refCnt:Returns the reference count of this object. If 0, it means this object has been deallocated.

FileRegion

它也是netty下的一個包,FileRegion資料通過支援零拷貝的channel將資料傳輸到目標channel。

A region of a file that is sent via a Channel which supports zero-copy file transfer .

 

注意:檔案零拷貝傳輸對JDK版本和作業系統是有要求的:

FileChannel.transferTo(long, long, WritableByteChannel) has at least four known bugs in the old versions of Sun JDK and perhaps its derived ones. Please upgrade your JDK to 1.6.0_18 or later version if you are going to use zero-copy file transfer.
If your operating system (or JDK / JRE) does not support zero-copy file transfer, sending a file with FileRegion might fail or yield worse performance. For example, sending a large file doesn't work well in Windows.
Not all transports support it

 

介面結構如下:

下面對新增方法的解釋:

count:Returns the number of bytes to transfer.

position:Returns the offset in the file where the transfer began.

transferred:Returns the bytes which was transfered already.

transferTo:Transfers the content of this file region to the specified channel.

AbstractReferenceCounted

這個類是通過一個變數來記錄引用的增加或減少情況。

類結構如下:

先來看成員變數:

refCnt就是內部記錄引用數的一個volatile型別的變數,refCntUpdater是一個 AtomicIntegerFieldUpdater 型別常量,AtomicIntegerFieldUpdater 基於反射原子性更新某個類的 volatile 型別成員變數。

A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes. 
This class is designed for use in atomic data structures in which several fields of the same node are independently subject to atomic updates. Note that the guarantees of the compareAndSet method in this class are weaker than in other atomic classes.
Because this class cannot ensure that all uses of the field are appropriate for purposes of atomic access,
it can guarantee atomicity only with respect to other invocations of compareAndSet and set on the same updater.

 

方法如下:

1. 設定或獲取 refCnt 變數

2. 增加引用:

3. 減少引用:

 

AbstractFileRegion

AbstractFileRegion 繼承了AbstractReferenceCounted, 但他還是一個抽象類,只是實現了部分的功能,如下:

DefaultFileRegion

 文件說明如下:

Default FileRegion implementation which transfer data from a FileChannel or File. 
Be aware that the FileChannel will be automatically closed once refCnt() returns 0.

 

先來看一下它主要的成員變數:

f:是指要傳輸的原始檔。

file:是指要傳輸的源FileChannel

position:傳輸開始的位元組位置

count:總共需要傳輸的位元組數量

transferred:指已經傳輸的位元組數量

 

關鍵方法 transferTo 的原始碼如下:

思路:先計算出剩餘需要傳輸的位元組的總大小。然後從 position 的相對位置開始傳輸到指定的target sink。

注意:position是指相對於position最初開始位置的大小,絕對位置為 this.position + position。

其中,open 方法如下,它返回一個隨機讀取檔案的 FileChannel 物件。

其deallocate 方法如下:

思路:直接關閉,取消成員變數對於FileChannel的引用,便於垃圾回收時可以回收FileChannel,然後關閉FileChannel即可。

 

總結:它通過 RandomeAccessFile 獲取 可以支援隨機訪問 FileChannelImpl 的FileChannel,然後根據相對位置計算出絕對位置以及需要傳輸的位元組總大小,最後將資料傳輸到target。

其引用計數的處理呼叫其父類 AbstractReferenceCounted的對應方法。

ReadableChannelFileRegion

其原始碼如下:

其內部的buffer 的大小時 64KB,_traferred 變數記錄了已經傳輸的位元組數量。ReadableByteChannel 是按順序讀的,所以pos引數沒有用。

 

下面,重點對DiskStore做一下剖析。 

DiskStore

它就是用來儲存block 到磁碟的。

 構造方法如下:

它有三個成員變數:

blockSizes 記錄了每一個block 的blockId 和其大小的關係。可以通過get 方法獲取指定blockId 的block大小。如下:

 

putBytes方法如下:

putBytes將資料寫入到磁碟中;getBytes獲取的是BlockData資料,注意現在只是返回檔案的引用,檔案的內容並沒有返回,使得上文所講的多種多樣的BlockData轉換操作直接對接FileChannel,即本地檔案,可以充分發揮零拷貝等特性,資料傳輸效率會更高。

其中put 方法如下:

思路很簡單,先根據diskManager獲取到block在磁碟中的檔案的抽象 -- File物件,然後獲取到filechannel,呼叫回撥函式將資料寫入到本地block檔案中,最後記錄block和其block大小,最後關閉out channel。如果中途丟擲異常,則格式化已寫入的資料,確保資料的寫入是原子化操作(要麼全成功,要麼全失敗)。

put方法依賴的方法如下:

openForWrite方法,先獲取filechannel,然後如果資料有加密,在建立加密的channel用來處理加密的資料

總結:本篇文章介紹了維護blockId和block物理檔案的對映關係的DiskBlockManager;Hadoop yarn定位LOCAL_DIRS環境變數是如何定義的;定義了block的儲存方式以及轉換成流或channel或其他物件的BlockData介面以及它的三個具體的實現,順便介紹了directByteBuffer記憶體清理機制--Cleaner以及相關類的解釋;用作資料傳輸的DefaultFileRegion和ReadableChannelFileRegion類以及其相關類;最後介紹了磁碟儲存裡的重頭戲--DiskStore,並重點介紹了其用於儲存資料和刪除資料的方法。

不足之處:本篇文章對磁碟IO中的nio以及netty中的相關類介紹的不是很詳細,可以閱讀相關文件做進一步理解。畢竟如何高效地和磁碟打交道也是比較重要的技能。後面有機會可能會對java的集合io多執行緒jdk部分的原始碼做一次徹底剖析,但那是後話了。目前打算先把spark中認為自己比較重要的梳理一遍。

&n