1. 程式人生 > >Spark原理框架和作業執行流程

Spark原理框架和作業執行流程

@Author  : Spinach | GHB
@Link    : http://blog.csdn.net/bocai8058

0 Hadoop與Spark的對比關係

Google 在 2003 年和 2004 年先後發表了 Google 檔案系統 GFS 和 MapReduce 程式設計模型兩篇文章,基於這兩篇開源文件,06 年 Nutch 專案子專案之一的 Hadoop 實現了兩個強有力的開源產品:HDFS 和 MapReduce。 Hadoop 成為了典型的大資料批量處理架構,由 HDFS 負責靜態資料的儲存,並通過 MapReduce 將計算邏輯分配到各資料節點進行資料計算和價值發現。之後以 HDFS 和 MapReduce 為基礎建立了很多專案,形成了Hadoop生態圈。

而Spark則是UC Berkeley AMP lab(加州大學伯克利分校AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架, 專門用於大資料量下的迭代式計算。是為了跟 Hadoop 配合而開發出來的,不是為了取代 Hadoop, ++Spark 運算比 Hadoop 的 MapReduce 框架快的原因是因為 Hadoop 在一次 MapReduce 運算之後,會將資料的運算結果從記憶體寫入到磁碟中,第二次 Mapreduce 運算時在從磁碟中讀取資料,所以其瓶頸在2次運算間的多餘 IO 消耗。 Spark 則是將資料一直快取在記憶體中,直到計算得到最後的結果,再將結果寫入到磁碟,所以多次運算的情況下, Spark 是比較快的。++ 其優化了迭代式工作負載。具體區別如下:

1 Spark原理框架

Spark是專為大規模資料處理而設計的快速通用的計算引擎。

特點

  • 快速:記憶體計算比hadoop快100倍,硬碟計算快10倍;
  • 易用:提供Java、R、Python、Scala介面;
  • 通用:整合:spark sql、spark streaming、MLlib、GraphX;
  • 移植性:可以有Mesos、Yarn、Kubernetes、Standalone等部署模式;

1.1 框架

  • Spark SQL: 提供了類 SQL 的查詢,返回 Spark-DataFrame 的資料結構;
  • Spark Streaming: 流式計算,主要用於處理線上實時時序資料;
  • MLlib: 提供機器學習的各種模型和調優;
  • GraphX: 提供基於圖的演算法,如 PageRank;

Spark 的主要特點還包括:

  1. 提供 Cache 機制來支援需要反覆迭代計算或者多次資料共享,減少資料讀取的 IO 開銷;
  2. 提供了一套支援 DAG 圖的分散式平行計算的程式設計框架,減少多次計算之間中間結果寫到 Hdfs 的開銷;
  3. 使用多執行緒池模型減少 Task 啟動開稍,shuffle 過程中避免不必要的 sort 操作並減少磁碟 IO 操作。(Hadoop 的 Map 和 reduce 之間的 shuffle 需要 sort)

1.2 相關術語解釋

  • 叢集管理程式(Cluster Manager):在叢集上獲取資源的外部服務(例如:Local、Standalone、Mesos或Yarn等叢集管理系統);
  • Master:Master節點上常駐Master守護程序和Driver程序,Master負責將序列任務變成可並行執行的任務集Tasks, 同時還負責出錯問題處理等,Master負載管理全部的Worker節點,而Worker節點負責執行任務;
  • Worker: 叢集中任何可以執行Application程式碼的節點,在Standalone模式中指的是通過slave檔案配置的Worker節點,在Spark on Yarn模式下就是NodeManager節點;
  • 應用程式(Application): 基於Spark的使用者程式,包含了一個Driver Program 和叢集中多個的Executor;
  • 驅動(Driver): 執行Application的main()函式並且建立SparkContext;
  • Executor: 某個Application執行在worker節點上的一個程序, 該程序負責執行某些Task,並且負責將資料存到記憶體或磁碟上,每個Application都有各自獨立的一批Executor。在Spark on Yarn模式下,其程序名稱為CoarseGrainedExecutor Backend。一個CoarseGrainedExecutor Backend有且僅有一個Executor物件, 負責將Task包裝成taskRunner,並從執行緒池中抽取一個空閒執行緒執行Task, 這個每一個oarseGrainedExecutor Backend能並行執行Task的數量取決與分配給它的cpu個數;
  • Stage: 每個Job會被拆分成多組Task, 作為一個TaskSet, 其名稱為Stage,Stage的劃分和排程是有DAGScheduler來負責的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發生shuffle的地方;
  • Task: 被送到某個Executor上的工作單元,但hadoopMR中的MapTask和ReduceTask概念一樣,是執行Application的基本單位,多個Task組成一個Stage,而Task的排程和管理等是由TaskScheduler負責;
  • 操作(Operation): 作用於RDD的各種操作分為Transformation和Action;
graph LR
A[RDD操作]-->B[Transformation]
A-->C[Action]

1.3 RDD

RDD(Resilient Distributed Datasets) ,彈性分散式資料集,是Spark底層的分散式儲存的資料結構,可以說是Spark的核心, Spark API的所有操作都是基於RDD的。 是分散式記憶體的一個抽象概念,RDD提供了一種高度受限的共享記憶體模型,即RDD是隻讀的記錄分割槽的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而建立,然而這些限制使得實現容錯的開銷很低。

Spark的基本計算單元,可以通過一系列運算元進行操作(主要有Transformation和Action操作)

1.3.1 窄依賴與寬依賴

窄依賴:父RDD每一個分割槽最多被一個子RDD的分割槽所用;表現為一個父RDD的分割槽對應於一個子RDD的分割槽,或兩個父RDD的分割槽對應於一個子RDD 的分割槽。

寬依賴:父RDD的每個分割槽都可能被多個子RDD分割槽所使用,子RDD分割槽通常對應所有的父RDD分割槽。

常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :如果JoinAPI之前被呼叫的RDD API是寬依賴(存在shuffle),而且兩個join的RDD的分割槽數量一致,join結果的rdd分割槽數量也一樣,這個時候join api是窄依賴)。

常見的寬依賴有:groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此之外的,rdd 的join api是寬依賴)。

1.3.2 DAG(有向無環圖)與DAGScheduler(有向無環圖排程器)

Directed Acycle graph,反應RDD之間的依賴關係。

基於DAG劃分Stage 並以TaskSet的形勢提交Stage給TaskScheduler;負責將作業拆分成不同階段的具有依賴關係的多批任務;最重要的任務之一就是:計算作業和任務的依賴關係,制定排程邏輯。在SparkContext初始化的過程中被例項化,一個SparkContext對應建立一個DAGScheduler。

1.3.3 Transformation函式和Action函式

Transformation函式

Transformation函式 描述 型別
map 將原來 RDD 的每個資料項通過 map 中的使用者自定義函式f對映轉變為一個新的元素。原始碼中map運算元相當於初始化一個RDD, 新RDD叫做MappedRDD(this, sc.clean(f))。 Value資料型別,輸入分割槽與輸出分割槽一對一型
flatMap 將原來 RDD 中的每個元素通過函式f轉換為新的元素,並將生成的RDD的每個集合中的元素合併為一個集合,內部建立FlatMappedRDD(this,sc.clean(f))。 Value資料型別,輸入分割槽與輸出分割槽一對一型
mapPartitions 獲取到每個分割槽的迭代器,在函式中通過這個分割槽整體的迭代器對整個分割槽的元素進行操作。 內部實現是生成MapPartitionsRDD。 Value資料型別,輸入分割槽與輸出分割槽一對一型
glom glom函式將每個分割槽形成一個數組,內部實現是返回的GlommedRDD。 Value資料型別,輸入分割槽與輸出分割槽一對一型
union 使用 union 函式時需要保證兩個RDD元素的資料型別相同,返回的RDD資料型別和被合併的RDD元素資料型別相同,並不進行去重操作,儲存所有元素。 Value資料型別,輸入分割槽與輸出分割槽多對一型
cartesian 對兩個RDD內的所有元素進行笛卡爾積操作。操作後,內部實現返回CartesianRDD。 Value資料型別,輸入分割槽與輸出分割槽多對一型
grouBy 將元素通過函式生成相應的 Key,資料就轉化為Key-Value格式,之後將Key相同的元素分為一組。 Value資料型別,輸入分割槽與輸出分割槽多對多型
filter 對元素進行過濾,對每個元素應用f函 數,返回值為true的元素在RDD中保留,返回值為false的元素將被過濾掉。內部實現相當於生成FilteredRDD(this,sc.clean(f))。 Value資料型別,輸出分割槽為輸入分割槽子集型
distinct 將RDD中的元素進行去重操作。圖9中的每個方框代表一個RDD分割槽,通過distinct函式,將資料去重。 Value資料型別,輸出分割槽為輸入分割槽子集型
subtract subtract相當於進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。 Value資料型別,輸出分割槽為輸入分割槽子集型
sample sample 將 RDD 這個集合內的元素進行取樣,獲取所有元素的子集。使用者可以設定是否有放回的抽樣、百分比、隨機種子,進而決定取樣方式。內部實現是生成SampledRDD(withReplacement, fraction, seed)。 Value資料型別,輸出分割槽為輸入分割槽子集型
takeSample takeSample()函式和上面的sample函式是一個原理,但是不使用相對比例取樣,而是按設定的取樣個數進行取樣,同時返回結果不再是RDD,而是相當於對取樣後的資料進行Collect(),返回結果的集合為單機的陣列。 Value資料型別,輸出分割槽為輸入分割槽子集型
cache cache 將 RDD 元素從磁碟快取到記憶體。 相當於 persist(MEMORY_ONLY) 函式的功能。 Value資料型別,Cache型
persist persist 函式對 RDD 進行快取操作。資料快取在哪裡依據 StorageLevel 這個列舉型別進行確定。 Value資料型別,Cache型
mapValues 針對(Key, Value)型資料中的 Value 進行 Map 操作,而不對 Key 進行處理。 Key-Value資料型別,輸入分割槽與輸出分割槽一對一
combineByKey 只是兩個值合併成一個值。 Key-Value資料型別,單個RDD聚集
reduceByKey reduceByKey 是比 combineByKey 更簡單的一種情況,只是兩個值合併成一個值,( Int, Int V)to (Int, Int C),比如疊加。 Key-Value資料型別,單個RDD聚集
partitionBy 對RDD進行分割槽操作。 Key-Value資料型別,單個RDD聚集
Cogroup 將兩個RDD進行協同劃分。 Key-Value資料型別,兩個RDD聚集
join 對兩個需要連線的 RDD 進行 cogroup函式操作,將相同 key 的資料能夠放到一個分割槽,在 cogroup 操作之後形成的新 RDD 對每個key 下的元素進行笛卡爾積的操作,返回的結果再展平,對應 key 下的所有元組形成一個集合。最後返回 RDD[(K, (V, W))]。 Key-Value資料型別,連線
leftOutJoin和rightOutJoin LeftOutJoin(左外連線)和RightOutJoin(右外連線)相當於在join的基礎上先判斷一側的RDD元素是否為空,如果為空,則填充為空。 如果不為空,則將資料進行連線運算,並返回結果。 Key-Value資料型別,連線

Action函式

Action函式 描述 型別
foreach 對 RDD 中的每個元素都應用 f 函式操作,不返回 RDD 和 Array, 而是返回Uint。 無輸出
saveAsTextFile 將資料輸出,儲存到 HDFS 的指定目錄。 HDFS
saveAsObjectFile 將分割槽中的每10個元素組成一個Array,然後將這個Array序列化,對映為(Null,BytesWritable(Y))的元素,寫入HDFS為SequenceFile的格式。 HDFS
collect collect 相當於 toArray, toArray 已經過時不推薦使用, collect 將分散式的 RDD 返回為一個單機的 scala Array 陣列。在這個陣列上運用 scala 的函式式操作。 Scala集合和資料型別
collectAsMap 對(K,V)型的RDD資料返回一個單機HashMap。 對於重複K的RDD元素,後面的元素覆蓋前面的元素。 Scala集合和資料型別
reduceByKeyLocally 先reduce再collectAsMap的功能,先對RDD的整體進行reduce操作,然後再收集所有結果返回為一個HashMap。 Scala集合和資料型別
lookup 對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。 Scala集合和資料型別
count 返回整個 RDD 的元素個數。 Scala集合和資料型別
top 返回最大的k個元素。 Scala集合和資料型別
reduce reduce函式相當於對RDD中的元素進行reduceLeft函式的操作。 Scala集合和資料型別
fold fold和reduce的原理相同,但是與reduce不同,相當於每個reduce時,迭代器取的第一個元素是zeroValue。 Scala集合和資料型別
aggregate 先對每個分割槽的所有元素進行aggregate操作,再對分割槽的結果進行fold操作。 Scala集合和資料型別

1.4 工作執行原理

1.4.1 Spark執行基本流程
1.4.2 計算流程
1.4.3 從程式碼構建DAG圖
// Spark program
Val lines1 = sc.textFile(inputPath1). map(···)). map(···)
Val lines2 = sc.textFile(inputPath2) . map(···)
Val lines3 = sc.textFile(inputPath3)
Val dtinone1 = lines2.union(lines3)
Val dtinone = lines1.join(dtinone1)
dtinone.saveAsTextFile(···)
dtinone.filter(···).foreach(···)

Spark的計算髮生在RDD的Action操作,而對Action之前的所有Transformation,Spark只是記錄下RDD生成的計劃中,而不會觸發真正的計算。Spark核心會在需要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是DAG。

1.4.4 將DAG劃分為Stage核心演算法

Application多個job多個Stage:Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,後面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢後,後面的Stage才會執行。

  • 劃分依據:Stage劃分的依據就是寬依賴,何時產生寬依賴,reduceByKey, groupByKey等運算元,會導致寬依賴的產生。

  • 核心演算法:從後往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。Spark核心會從觸發Action操作的那個RDD開始從後往前推,首先會為最後一個RDD建立一個stage,然後繼續倒推,如果發現對某個RDD是寬依賴,那麼就會將寬依賴的那個RDD建立一個新的stage,那個RDD就是新的stage的最後一個RDD。然後依次類推,繼續繼續倒推,根據窄依賴或者寬依賴進行stage的劃分,直到所有的RDD全部遍歷完成為止。

1.4.5 將DAG劃分為Stage剖析

從HDFS中讀入資料生成3個不同的RDD,通過一系列transformation操作後再將計算結果儲存回HDFS。可以看到這個DAG中只有join操作是一個寬依賴,Spark核心會以此為邊界將其前後劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行union操作,這樣大大提高了計算的效率。

1.4.6 提交Stages

排程階段的提交,最終會被轉換成一個任務集的提交,DAGScheduler通過TaskScheduler介面提交任務集,這個任務集最終會觸發TaskScheduler構建一個TaskSetManager的例項來管理這個任務集的生命週期,對於DAGScheduler來說,提交排程階段的工作到此就完成了。而TaskScheduler的具體實現則會在得到計算資源的時候,進一步通過TaskSetManager排程具體的任務到對應的Executor節點上進行運算。

1.4.7 監控Job、Task、Executor

DAGScheduler監控Job與Task:要保證相互依賴的作業排程階段能夠得到順利的排程執行,DAGScheduler需要監控當前作業排程階段乃至任務的完成情況。這通過對外暴露一系列的回撥函式來實現的,對於TaskScheduler來說,這些回撥函式主要包括任務的開始結束失敗、任務集的失敗,DAGScheduler根據這些任務的生命週期資訊進一步維護作業和排程階段的狀態資訊。

DAGScheduler監控Executor的生命狀態:TaskScheduler通過回撥函式通知DAGScheduler具體的Executor的生命狀態,如果某一個Executor崩潰了,則對應的排程階段任務集的ShuffleMapTask的輸出結果也將標誌為不可用,這將導致對應任務集狀態的變更,進而重新執行相關計算任務,以獲取丟失的相關資料。

1.4.8 獲取任務執行結果

結果DAGScheduler:一個具體的任務在Executor中執行完畢後,其結果需要以某種形式返回給DAGScheduler,根據任務型別的不同,任務結果的返回方式也不同。

兩種結果,中間結果與最終結果:對於FinalStage所對應的任務,返回給DAGScheduler的是運算結果本身,而對於中間排程階段對應的任務ShuffleMapTask,返回給DAGScheduler的是一個MapStatus裡的相關儲存資訊,而非結果本身,這些儲存位置資訊將作為下一個排程階段的任務獲取輸入資料的依據。

兩種型別,DirectTaskResult與IndirectTaskResult:根據任務結果大小的不同,ResultTask返回的結果又分為兩類,如果結果足夠小,則直接放在DirectTaskResult物件內中,如果超過特定尺寸則在Executor端會將DirectTaskResult先序列化,再把序列化的結果作為一個數據塊存放在BlockManager中,然後將BlockManager返回的BlockID放在IndirectTaskResult物件中返回給TaskScheduler,TaskScheduler進而呼叫TaskResultGetter將IndirectTaskResult中的BlockID取出並通過BlockManager最終取得對應的DirectTaskResult。

1.4.9 任務排程總體