1. 程式人生 > >spark:架構+執行機制的一些總結--50

spark:架構+執行機制的一些總結--50

Hadoop中包含計算框架MapReduce和分散式檔案系統HDFS,spark是一個計算框架

//

中間結果:spark儲存到記憶體、Hadoop儲存到磁碟,spark將執行模型抽象為通用的有向無環圖通用計劃(DAG)

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

資料格式和記憶體佈局

spark:分散式記憶體儲存結構彈性分散式資料集RDD,進行資料的儲存,RDD支援粗粒度寫操作,RDD讀取可精確到每條資料記錄,RDD可用來做分散式索引

spark特性,能夠控制資料在不同節點上的分割槽,使用者可以自定義分割槽策略,如:HASH分割槽等

//

執行策略:

MapReduce花費大量時間在資料shuffle排序,spark呼叫任務執行計劃圖(DAG),每一輪次輸出結果在記憶體快取

//

任務排程開銷:

spark採用事件驅動的類庫AKKA來啟動任務,通過執行緒池複用執行緒來避免程序或執行緒啟動和切換開銷

//

spark生態系統為“伯克利資料分析棧”(BDAS),Mesos是一個資源管理框架,spark自帶的資源管理框架Standalone,Mesos會對資源和任務進行隔離,並實現高效的資源任務排程,YARN與其類似

spark框架:Master/Slave模型

//

spark中Driver和Worker是兩個重要角色,Driver是程式應用的執行起點,負責作業排程,即Task任務的分發,Worker用來管理和建立Executer並行任務

Driver將Task和Task所依賴的file和jar序列化後傳遞給對應的worker機器

Executor:執行器,用於啟動執行緒池執行任務

//

SparkEnv:執行緒級別的上下文,儲存執行時的重要元件的引用,包括:

MapOutputTracker:負責shuffle元資訊的儲存

BroadcastManager:廣播變數控制與元資訊儲存

BlockManager:儲存管理、建立和查詢塊

//

SparkContext:整個應用的上下文,控制應用的分佈週期

//

整體流程:

client提交應用,Master找到一個Worker啟動Driver,Driver向Master資源管理器申請資源,之後將應用轉化為RDD Graph,再由DAGScheduler將RDD Graph轉化為Stage的有向無環圖,提交給TaskScheduler,由TaskScheduler提交任務給Executer執行

HDFS->textFile->Tranformation->SaveAsSequenceFile->HDFS

//

Spark on YARN:讓spark計算模型在雲梯YARN叢集上執行,直接讀取雲梯上的豐富計算資源

//

BlockManager:管理RDD物理分割槽,本質上一個RDD在程式碼中相當於資料的一個元資料結構

//

RDD:儲存著資料分割槽及其邏輯結構對映關係,儲存著RDD之前的依賴轉換關係,通過對RDD的依賴轉換關係形成spark的排程順序

RDD兩種建立方式:一是由父RDD的轉換,二是由HDFS之類檔案系統的建立

RDD兩種操作運算元:Transformation、Action

RDD內部屬性:1.分割槽列表,2.計算每個分片的函式,3.對父RDD的依賴列表,4.對Key-Value資料型別RDD的分割槽、控制分割槽策略和分割槽數,5.每個分割槽的地址列表

一個RDD可以儲存在幾個分割槽上,一個分割槽可以儲存幾個RDD的一部分

RDD邏輯上按Partition分塊,物理上以Block為儲存單位

//

幾個API

saveAsTextFile:函式將資料輸出,儲存到HDFS的指定目錄

saveAsObjectFile:將分割槽中每10個元素組成一個Array,將其序列化,寫入HDFS為SequenceFile格式

collect:相當於toArray【過時】,將分散式RDD返回為單機的scalaArray陣列

collectAsMap:對(k,v)型RDD返回一個單機HashMap

lookup:(k,v)返回指定key對應的元素形成Seq

count:RDD的元素個數

top:返回最大的元素

//

執行機制

RDD的Action運算元觸發Job的提交,提交到spark中的Job生成RDD DAG,由DAGScheduler轉化成Stage DAG,每個Stage中產生相應的Task集合,TaskScheduler將任務分發到Executor執行

其中TaskScheduler:將每個Stage中對應的任務進行提交和排程

TaskSetManager:通過Stage回溯到最源頭缺失的Stage提交到排程池pool中(根據JobID排序)

//

序列化是將物件轉換為位元組流

//

壓縮:1.Snappy:更高的壓縮速度,2.LZF:更高的壓縮比

在spark中使用壓縮:1.在spark-env.sh檔案中配置,2.在應用程式中···conf.set(----)

//

主、從節點通訊

從-->主:BlockManagerMasterActor(傳遞資訊和狀態)

主-->從:BlockManagerSlaveActor

BlockManagerMaster:對Actor通訊進行管理

資料讀寫:DiskStore、memoryStore、ConnectionManager、BlockManagerWorker

//

通訊框架AKKA

1.AKKA採用非同步通訊和分散式架構,2.可靠性:有監控和恢復機制,3.高效能,4.去中心,5.可擴充套件性

client(提交作業)--->Master--->Worker,      client<---Master<---Worker

(1)client to Master:

     RegisterApplication:註冊應用

(2)Master to client:

     RegisteredApplication:註冊後回覆給client

     ExecutorAdded:

     ExecutorUpdated:

(3)Master to Worker:

     LaunchExecutor:啟動Executor

     RegisteredWorker:worker註冊的回覆

     RegisterWorkerFailed:註冊worker失效的回覆

     killExecutor:停止Executor程序

(4)Worker to Master:

     RegisterWorker:註冊Worker

     Heartbeat:

     ExecuterStateChangel:

//

容錯機制

1.資料檢查點,2.記錄資料更新

Lineage(血統)類似重做日誌(Redolog)

Checkpoint機制

下面的情況需要使用資料檢查點:1.DAG的lineage過長,如果重算,開銷太大

                                                            2.在shuffle Dependency上做Checkpoint收益比較大

//

shuffle機制

洗牌(混洗),即把一組有一定規則的資料打散重新組合轉換成一組無規則隨機資料分割槽

shuffle將資料進行收集分配到指定的Reduce分割槽,Reduce階段根據函式對相應的分割槽做Reduce所需的函式處理

shuffle writer:通過shuffleMapTask中的runTask方法進入

抽象的特徵:Trait

shuffleBlockManager

最終在HashShuffleworker,將記憶體的Bucket寫到磁碟,儲存為檔案,並將shuffle的各個Bucket及對映資訊返回給主節點

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

******補充

spark兩個抽象:1.RDD,2.共享變數

1.RDD:(彈性分散式資料集)可並行資料容器,可以儲存任意型別資料,如有丟失,可根據粗粒度的日誌資料更新記錄的資訊(Lineage)重構

                  RDD在建立時可變指定切片個數,

                  RDD在轉換時,有個惰性計算(lazy evaluation)過程,期間會不斷的記錄到元資料(DAG),但並沒有發生真正計算

                  容錯:當Lineage很長時,可主動用Checkpoint動作把資料寫入儲存系統

2.共享變數:各個節點都可以共享的變數(並行化的時候,函式的所有變數在每個節點都做了一個拷貝)

                 spark提供兩種共享變數:1.廣播變數,2.累加器

spark工作原理:

(1)。客戶端啟動,進入初始化過程,通過與Mesos等資源管理系統互動,執行環境

              (粗粒度:一次性配置好所申請的所有資源,後面不在申請。細粒度:就是湊夠一個任務能夠執行的資源,就開始執行該任務)

(2)。轉換過程:增量的方式構建DAG圖(DAG:並行化執行,及故障恢復),執行時,spark利用貪心演算法將程式分成幾個Stage,每個Stage都有一定數量的任務做並行的處理(分寬依賴和窄依賴)

               spark通過patitionBy操作劃分,劃分寬依賴和窄依賴的好處:1.通過DAG,遇到窄依賴,將對應操作劃分到同一個Stage;遇到寬依賴,新建一個Stage、並把回溯到的操作改進新的Stage,實現流水線優化。2.窄依賴利用Lineage

(3)。執行過程:DAG排程器按依賴關係調度執行DAG圖,先執行不依賴任何階段的Stage,每個Stage會配備一定數量的Task並行的執行

(4)。釋放過程:釋放資源