1. 程式人生 > >第13課Spark核心架構解密

第13課Spark核心架構解密

第一階段:Spark streamingspark sqlkafkaspark核心原理(必須有一個大型專案經驗);

第二階段:spark執行的各種環境,各種故障的解決,效能優化(精通spark核心、執行原理);

第三階段:流處理、機器學習為鰲頭,需要首先掌握前兩個階段的內容;

跟隨王家林老師的零基礎講解,注重動手實戰,成為spark高數,笑傲大資料之林!

本期內容:

通過手動繪圖的方式解密Spark核心架構

通過案例來驗證Spark核心架構

3 Spark架構思考

一、詳細剖析Spark執行機制

1Driver端架構

Driver部分程式碼包含了SparkConf+SparkContext

,基本一切應用程式程式碼由Driver端的程式碼和分佈在叢集其他機器上的Executor程式碼組成(textFile flatMap map),Executorexecutor是執行在worker上的程序裡的物件)是由執行緒池併發執行和執行緒的覆用,執行緒處理task任務,taskdiskmem上讀取資料。

SparkApplication的執行不依賴於ClusterManager,也就是說執行時不需要ClusterManager的參與(粗粒度分配資源即一次性分配完成)。

Driver執行程式的時候建立了SparkContext並且有main方法,SparkContext本身是程式排程器(分高低度排程器

DAGSchedulerTaskScheduler)。Driver端是用來提交Spark程式的機器,這臺機器一般和Spark cluster叢集在相同的網路環境下,因為要保證DriverExecutor進行頻繁的通訊,並且Driver的機器配置基本和Worker相同,Driver的機器安裝了Spark,但不屬於Spark叢集的範圍。Application提交的時候使用spark-submit(可配置執行時的引數MEMCPU等),而生產環境下使用自動化shell指令碼提交。

首先Driver端的應用程式包含了ExecutorDriver程式碼部分。應用程式本身Driver程式碼(SparkConf

SparkContext),SparkConf中包含了設定程式名稱setAppNamesetMaster等,SparkContext包含了DAGSchedulerTaskSchedulerSchedulerBackend以及SparkEnvExecutor程式碼包含了對業務邏輯的具體實現的程式碼(mapflatmapReduceByKey等)。

可以看出Driver端由SparkConf中將程式執行的配置資訊傳給SparkContext的上下文,由SparkContext建立DAGScheduler(高層排程器)、TaskScheduler(底層排程器:負責一個作業內部執行)、SchedulerBackend(負責握住計算資源),例項化的過程中註冊當前程式給Master。然後DAGScheduler根據Actor觸發jobSparkContext通過DAGSchedulerjobRDD形成的DAG有向無環圖劃分為不同的StageTask具體運行於哪臺機器上?就是在劃分Stage的時候確定,這裡就是根據資料本地行定位傳送的Executor

置),TaskScheduler會將不同Stage中的一系列的Task傳送到對應的Executor去執行,具體劃分的的過程在前面WordCount背後的資料流中詳細進行了分析描述,即不同的(mapflatmapReduceByKey)函式RDD api產生的RDD形成的DAG進行劃分,得到不同的Stage,而每個Stage都由相同業務邏輯,不同處理資料的Task組成,Task又一一對應RDD中的每個Partition。將劃分Stages得到的TaskSet提交到TaskScheduler,進而提交給executor執行。

Driver端劃分好Stage後,提交到Spark叢集,即提交到MasterMaster接收後分配資源的AppId併發送指令給Worker,然後Worker分配Executor,最後由Executor(併發處理資料分片)中的執行緒池中的執行緒併發執行。

(2)Spark叢集執行架構

Driver端提交程式Tasks後,由Master檢測沒有問題便進行資源的分配和AppId的分配,然後傳送指令給Worker節點,Worker節點預設會分配一個Executor,然後在Executor的執行緒池中進行併發執行。

Master收到提交的程式,Master根據以下三點為程式分配資源:

1,spark-env.sh和spark-defaults.sh

2,spark-submit提供的引數

3,程式中SparkConf配置的引數

Worker管理當前Node的資源,並接受Master的指令來分配具體的計算資源(Executor)。Worker程序通過一個Proxy(代理控制代碼)為ExecutorRunner的物件例項遠端啟動ExecutorBackend程序,實際工作的時候會通過TaskRunner(一個Runner的介面,執行緒一般會用Runner的介面封裝業務邏輯,有run方法所以可以回撥)來封裝Executor接收到的Task,然後從ThreadPool中獲取一個執行緒執行,執行完成後釋放並覆用。

在最後一個Stage前的其他Stage都進行shuffleMapTask,此時是對資料進行shuffleshuffle的結果儲存在Executor所在節點的本地檔案系統中,最後一個Stage中的Task就是ResultTask,負責結果資料生成。Driver會不斷髮送TaskExecutor進行執行,所以Task都正確執行,到程式執行結束;若超過執行次數的限制,或者沒有執行時會停止,待正確執行後會進入下一個stage,若沒有正確執行成功,高層排程器DAGSchedular會進行一定次數的重試,若還是執行不成功就意味著整個作業的失敗。

每個Task具體執行RDD中的一個Partition,(預設情況下為128M,但最後一個記錄跨兩個Block)基於該Partition具體執行我們定義的一系列內部函式,直到程式執行完成。

二、Spark執行架構概要

首先使用者建立SparkContext,新建立的SparkContext會根據程式設計時設定的引數或系統預設的配置連線到ClusterManager上,ClusterManager會根據使用者提交時的設定(如:佔用CPUMEN等資源情況),來為使用者程式分配計算資源,啟動相應的Executor;而Driver根據使用者程式排程的Stage的劃分,即高層排程器(RDD的依賴關係),若有寬依賴,會劃分成不同的Stage,每個Stage由一組完全相同的任務組成(業務邏輯相同,處理資料不同的Tasks組成),該Stage分別作用於待處理的分割槽,待Stage劃分完成和TaskSet建立完成後,Driver端會向Executor傳送具體的task,當Executor收到task後,會自動下載執行需要的庫、包等,準備好執行環境後由執行緒池中的執行緒開始執行,因此Spark執行是執行緒級別的。

Hadoop執行比Spark代價大很多,因Hadoop中的MapReduce執行的JVM虛擬機器不可以複用,而Spark執行的線程池中的執行緒可以進行復用。

執行Task的過程中,Executor會將執行的Task彙報給DriverDriver收到Task的執行狀態情況後,會根據具體狀況進行更新等。

三、Spark內部元件簡介

1Task劃分:Task根據不同的Stage的劃分,會被劃分為兩種型別

1shuffleMapTask,在最後一個Stage前的其他Stage都進行shuffleMapTask,此時是對資料進行shuffleshuffle的結果儲存在Executor所在節點的本地檔案系統中;

2)第二種TaskResultTask,負責生成結果資料;

Driver會不斷髮送TaskExecutor進行執行,所以Task都正確執行或者超過執行次數的限制,或者沒有執行時會停止,待正確執行後會進入下一個stage,若沒有正確執行成功,高層排程器DAGSchedular會進行一定次數的重試,若還是執行不成功就意味著整個作業的失敗。

2DAGScheduler:負責將jobrdd構成的DAG劃分為不同的Stage

3Worker叢集執行節點,Worker上不會執行Application的程式,因為Worker管理當前Node的資源資源,並接受Master的指令來分配具體的計算資源Executor(在新的程序中分配),Worker不會向Master傳送Worker的資源佔用情況(MEN、CPU),Worker向Master傳送心跳,只有Worker ID,因為Master在程式註冊的時候已經分配好了資源,同時只有故障的時候才會傳送資源的情況。

4、Job:包含了一系列Task的平行計算。Job由action(如SaveAsTextFile)觸發的,其前面是若干RDD,而RDD是Transformation級別的,transformation是Lazy的,因為是Lazy的所以不計算往前推(如:WordCount中collect觸發了job,執行的時候由mapPartitionRDD往前推到hadoopRDD,之後開始一步步執行),若兩個RDD之間回數的時候是窄依賴的話就在記憶體中進行迭代,也是Spark之所以快的原因,不是因為基於記憶體,因為其排程和容錯及其他內容。

5、寬依賴和窄依賴

在RDD中將依賴劃分成了兩種型別:窄依賴(narrow dependencies)和寬依賴(wide dependencies)。

窄依賴是指父RDD的每個分割槽都只被子RDD的一個分割槽所使用(一對一);

寬依賴就是指父RDD的分割槽被多個子RDD的分割槽所依賴(一對多)。

例如,map就是一種窄依賴,而join則會導致寬依賴(除非父RDD是hash-partitioned

這種劃分有兩個用處。首先,窄依賴支援在一個結點上管道化執行。例如基於一對一的關係,可以在filter之後執行map。其次,窄依賴支援更高效的故障還原。因為對於窄依賴,只有丟失的父RDD的分割槽需要重新計算。而對於寬依賴,一個結點的故障可能導致來自所有父RDD的分割槽丟失,因此就需要完全重新執行。因此對於寬依賴,Spark會在持有各個父分割槽的結點上,將中間資料持久化來簡化故障還原,就像MapReduce會持久化map的輸出一樣。

注:一個Application裡面可以有多個Jobs  一般一個action對應一個job。;Stage內部是RDD構成的,RDD內部是並行Task的集合 


(注:有任何有誤的地方請不吝指出,方便大家學習)