【Spark系列】三、Spark工作機制
Spark工作機制
Client |
Driver程式 |
Spark Context |
RDD DAG |
DAGSchedular |
TaskSchedular |
SparkEnv |
Worker Node |
Executor |
Task |
Task |
Cache |
Worker Node |
Executor |
Task |
Task |
Cache |
Cluster Manager |
圖 Spark架構圖
4.1應用程式執行流程
應用程式的執行流程為:
1)寫好的應用程式,打包成jar檔案。然後通過客戶端上傳到叢集。根據Driver的配置模式,要麼執行在客戶端,要麼由master指定worker啟動driver程序,並對整個應用程式進行監控和管理。接著,配置一些上下文環境。然後順序執行程式碼。
2)RDD的運算元包括兩大類:一是轉換運算元,二是行動運算元。只有Action運算元才會觸發Job的提交,也就是說,Spark採用的是惰性機制,在碰到行動運算元的時候,才提交作業。接著生成RDD有向無環圖DAG,由DAG排程器DAGScheduler轉化為階段Stage DAG,每個階段Stage中產生相應的任務Task集合,任務排程器TaskScheduler將任務分發到worker上的Executor執行。每個任務對應一個數據塊,使用使用者定義的函式處理資料塊。如圖:
4.1.1 應用提交與執行方式
Driver配置(deploy-mode)模式包含以下兩種方式。
·Driver程序執行在客戶端,對應用進行管理監控。(為預設項)
·主節點指定某個Worker節點啟動Driver,對應用進行監控管理。
圖4-4 Spark Driver位於Client 圖4-5 Spark Driver位於Worker節點的應用提交與執行
4.2 Spark任務排程模式
Spark有多種執行模式,如單機(Local)模式、Standalone模式、YARN模式、Mesos模式。
4.2.1 Spark應用程式之間的排程
一個Executor在一個時間段內只能給一個應用使用。
4.2.2 作業排程
不同執行緒提交的作業Job可以並行執行。一個作業分為多個Stage。整個RDD DAG為一個Job。action運算元中的本質是呼叫Spark上下文(SparkContext)中的runJob提交了Job。
作業的排程主要有FIFO和FAIR兩種模式。
FIFO模式
FIFO(先進先出)。
fair模式
在fair共享模式排程下,多個作業以輪詢(round robin)方式為分配資源。考慮到長任務和短任務問題,這樣長任務在前,短任務在後,短任務也可以獲得不錯的響應時間。
4.2.3 階段(Stage)排程
Action運算元觸發作業的提交,並形成RDD DAG。DAG Scheduler(排程器)負責將RDD DAG轉化為Stage(階段)DAG。Stage的DAG通過最後執行的Stage為根進行廣度優先遍歷,遍歷到最開始執行的Stage並執行,如果提交的Stage仍有未完成的父母Stage,則Stage需要等待其父Stage執行完才能執行。
waitingStages中記錄仍有未執行的父母Stage,防止過早執行。runningStages中儲存正在執行的Stage,防止重複執行。failedStages中儲存執行失敗的Stage,需要重新執行,這裡的設計是出於容錯的考慮。
4.2.4 任務(Task)排程
一個應用只有一個任務排程器(TaskScheduler)。所有TaskSetManager都是由這個TaskScheduler排程。一個Stage對也只有一個TaskSetManager。TaskSetManager通過一定次序放入排程池pool中。在排程池中,這些TaskSetMananger又會根據Job ID排序,先提交的Job的TaskSetManager優先排程,然後一個Job內的TaskSetManager ID小的先排程。
在執行任務時,任務分配規則:
按照“儘量將任務分配到資料塊所儲存的位置”原則分配任務。資料塊的儲存位置請見4.3.3節。
執行地點的選取:
1) 如果是呼叫過cache()方法的RDD,則讀取記憶體快取中分割槽的資料。
2) 如果在磁碟中,通常最開始的RDD會有相應資訊,例如,從HDFS上讀取的資料,HDFS分割槽就是最好的執行地點。
3) 如果不是上面兩種情況,將遍歷RDD DAG獲取第一個窄依賴的父親RDD對應分割槽的執行地點。
4.3 Spark I/O機制
4.3.1 序列化
序列化是將物件轉換為位元組流,本質上可以理解為將連結串列儲存的非連續空間的資料儲存轉化為連續空間儲存的陣列中。這樣就可以將資料進行流式傳輸或者塊儲存。
4.3.2 壓縮
當大片連續區域進行資料儲存並且儲存區域中資料重複性高的狀況下,資料適合進行壓縮。陣列或者物件序列化後的資料塊可以考慮壓縮。所以序列化後的資料可以壓縮,使資料緊縮,減少空間開銷。
Snappy提供了更高的壓縮速度,LZF提供了更高的壓縮比,使用者可以根據具體需求選擇壓縮方式。
4.3.3 Spark儲存系統
可以從以下幾個維度理解整個儲存系統:類介面、資料讀寫流程和資料通訊。
(1) 類介面。
所有外部類都通過塊管理器介面(BlockManager)對儲存模組(storage)進行操作。
(2) 資料讀寫流程
資料儲存分為3個層次:記憶體、本地磁碟和遠端磁碟。在diskManager中,儲存塊ID(blockId)和檔案路徑對映。
·資料讀取流程
在RDD類中,通過compute方法呼叫迭代器(iterator)讀取某個分割槽(Partition)的資料。分割槽是邏輯概念。一個分割槽對應物理上的一個塊(block)。一個Executor負責若干個分割槽。檢視資料儲存位置的優先順序是:
1) 記憶體;
2) Tachyon;
3) 本地磁碟;
4) 遠端磁碟
在獲取遠端資料時,先得到遠端資料路徑,然後通過塊管理器工作機建立通訊管理器,並從遠端讀取資料。
·資料寫入流程
資料寫入流程主要分為以下幾個步驟。
1)在RDD類中,通過呼叫compute方法計算要寫到哪個分割槽。
2)然後通過快取管理器(CacheManager)呼叫塊管理器(BlockManager),判斷資料是否已經寫入,如果未寫則寫入。
3)塊管理器(BlockManager)根據指定的儲存層次向相應塊寫入資料。並向主節點彙報儲存狀態。
·MemoryStore:提供Block在記憶體中的Block讀寫功能。
·DiskStore:提供Block在磁碟上以檔案形式讀寫的功能。
·BlockManagerWorker:對遠端資料的非同步傳輸進行管理。
·ConnectionManager:提供本地機器和遠端節點進行網路傳輸Block的功能。
(3) 資料通訊
主節點和從節點之間通過Actor傳送資訊。
·BlockManagerMasterActor:在主節點建立,所有從節點都用於這個Actor引用,並通過這個Actor的引用向主節點傳遞資訊。
·BlockManagerSlaveActor:在從節點建立,主節點擁有所有從節點的這個Actor引用,通過這個Actor引用向從節點傳遞控制資訊(命令)。
塊管理器(BlockManager)在內部封裝塊管理器Master(BlockManagerMaster),並通過BlockManagerMaster對Actor通訊進行管理。各從節點的塊管理器(BlockManager)物件在Spark上下文環境中(SparkEnv)中建立。在SparkEnv中也會建立其他管理元件,例如connectionManager、broadcastManager、cacheManager等。
4.3.3.2 Spark的資料儲存
圖3-2 RDD資料管理模型
在物理上,RDD物件實質上是一個元資料結構,儲存著塊、節點(Block、Node)等的對映關係,以及其他的元資料資訊。資料塊Block對應一個分割槽,若干個分割槽組成一個RDD。
分割槽是邏輯概念,變換前後的分割槽在物理上可能處在同一塊記憶體。這是很重要的優化,以防止函式式資料不變性(immutable)導致的記憶體需求無限擴張。如果要重複使用資料(機器學習中多次迭代),可以呼叫cache()方法快取資料。圖3-2為RDD的資料儲存模型。
4.4 Spark通訊模組
下面介紹分散式通訊的幾種方式。
(1) RPC(Remote Produce Call)
RPC是遠端過程呼叫協議,基於C/S模型呼叫。過程大致可以理解為本地分散式物件向本機發請求,不用自己編寫底層通訊本機。通過網路向伺服器傳送請求,伺服器物件接收引數後,進行處理,再把處理後的結果傳送回客戶端。
(2) RMI(Remote MethodInvocation)
RMI(遠端方法呼叫)和RPC一樣都是呼叫遠端的方法,可以把RMI看做是用Java語言實現了RPC協議。RPC不支援物件通訊,支援物件傳輸,這也是RMI相比於RPC的優越之處。
(3) JMS(Java Message Service)
JMS, java訊息服務是Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。其支援P2P和pub/stub兩種訊息模型,即點對點和釋出訂閱兩種模型。其優點在於:支援非同步通訊、訊息生產者和消費者耦合度低。應用程式通過讀寫佇列訊息(針對應用程式的資料)來通訊,而無須專用連線來連線它們。
4.4.1 通訊框架AKKA Spark
Spark模組間通訊使用AKKA框架。AKKA是用Scala開發的一個庫,用於編寫Actor模型應用。Actor是一些包含狀態和行為的物件。每一個actor擁有自己的屬性和方法,從而使得Actor模型容易併發執行。
Actor通過訊息郵件佇列通訊。傳送端通過“!”符號傳送訊息,接收端通過receive方法中的case模式匹配接收訊息,並進行相應處理。這些通訊是非同步的。
通常一個Actor系統是一個重量級結構。它會分配多個執行緒。所以對於每一個應用,一般只要一個Actor系統。
AKKAActor樹形結構
一個Actor會建立多個可子Actor,並負責監督這些子Actor,讓這些子Actor完成小的任務。同時,子Actor又可下分為多個子Actor。
4.4.2 Client、Master和Worker間的通訊
在Standalone模式下,存在三個角色: client、master、worker。
·Client:提交作業。
·Master:負責接收作業,並啟動Driver,管理Worker和Executor。
·Worker:期性地通過beatheart向Master傳送狀態資訊。當master向它傳來啟動executor命令的時候,它就啟動Executor進行計算。
4.5 容錯機制
一般來說,分散式資料集的容錯性有兩種方式:資料檢查點和記錄資料的更新。資料檢查點操作成本很高,因此,Spark選擇記錄更新的方式。RDD只支援粗粒度轉換,是對全域性資料做同樣的重做進而恢復資料。
4.6 Shuffle機制
Shuffle的本義是洗牌、混洗,即把一組有一定規則的資料打散重新組合轉換成一組無規則隨機資料分割槽。Spark中的Shuffle更像是洗牌的逆過程,把一組無規則的資料儘量轉換成一組具有一定規則的資料,Spark中的Shuffle和MapReduce中的Shuffle思想相同。
4.7 union, aggregate, join, concatenation區別
·union合併,例:
1,2
3,4
union
4,2
1,3 =》 1,2
3,4
4,2
1,2
·aggregate 聚集
1,2
3,4
union
4,2
1,3 =》 1,2
1,2
3,4
4,2
與union合併相比多了一個排序
·join 聯接,類似資料庫的聯接操作(通過關鍵詞聯接)
資料1:
1,23,23
2,23,12
3,333,112
…
資料2:
1, we,asd
2, sd,asd
3, llksd,asd
資料1 join 資料2:
1,23,23,we,asd
2,23,12,sd,asd
3,333,112,llksd,asd
·concatenation結合,連線(英語,共同迎合)