1. 程式人生 > >Spark學習(二)——RDD的設計與運行原理

Spark學習(二)——RDD的設計與運行原理

center data 創建 組成 分享圖片 img medium 列操作 信息

Spark的核心是建立在統一的抽象RDD之上,使得Spark的各個組件可以無縫進行集成,在同一個應用程序中完成大數據計算任務。RDD的設計理念源自AMP實驗室發表的論文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》。

RDD設計背景

在實際應用中,存在許多叠代式算法(比如機器學習、圖算法等)和交互式數據挖掘工具,這些應用場景的共同之處是,不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入。但是,目前的MapReduce框架都是把中間結果寫入到HDFS中,帶來了大量的數據復制、磁盤IO和序列化開銷。雖然,類似Pregel等圖計算框架也是將結果保存在內存當中,但是,這些框架只能支持一些特定的計算模式,並沒有提供一種通用的數據抽象。RDD就是為了滿足這種需求而出現的,它提供了一個抽象的數據架構,我們不必擔心底層數據的分布式特性,只需將具體的應用邏輯表達為一系列轉換處理,不同RDD之間的轉換操作形成依賴關系,可以實現管道化,從而避免了中間結果的存儲,大大降低了數據復制、磁盤IO和序列化開銷。

RDD概念

一個RDD就是一個分布式對象集合,本質上是一個只讀的分區記錄集合,每個RDD可以分成多個分區,每個分區就是一個數據集片段,並且一個RDD的不同分區可以被保存到集群中不同的節點上,從而可以在集群中的不同節點上進行並行計算。RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,不能直接修改,只能基於穩定的物理存儲中的數據集來創建RDD,或者通過在其他RDD上執行確定的轉換操作(如map、join和groupBy)而創建得到新的RDD。RDD提供了一組豐富的操作以支持常見的數據運算,分為“行動”(Action)和“轉換”(Transformation)兩種類型,前者用於執行計算並指定輸出的形式,後者指定RDD之間的相互依賴關系。兩類操作的主要區別是,轉換操作(比如map、filter、groupBy、join等)接受RDD並返回RDD,而行動操作(比如count、collect等)接受RDD但是返回非RDD(即輸出一個值或結果)。RDD提供的轉換接口都非常簡單,都是類似map、filter、groupBy、join等粗粒度的數據轉換操作,而不是針對某個數據項的細粒度修改。因此,RDD比較適合對於數據集中元素執行相同操作的批處理式應用,而不適合用於需要異步、細粒度狀態的應用,比如Web應用系統、增量式的網頁爬蟲等。正因為這樣,這種粗粒度轉換接口設計,會使人直覺上認為RDD的功能很受限、不夠強大。但是,實際上RDD已經被實踐證明可以很好地應用於許多並行計算應用中,可以具備很多現有計算框架(比如MapReduce、SQL、Pregel等)的表達能力,並且可以應用於這些框架處理不了的交互式數據挖掘應用。
Spark用Scala語言實現了RDD的API,程序員可以通過調用API實現對RDD的各種操作。RDD典型的執行過程如下:
1. RDD讀入外部數據源(或者內存中的集合)進行創建;
2. RDD經過一系列的“轉換”操作,每一次都會產生不同的RDD,供給下一個“轉換”使用;
3. 最後一個RDD經“行動”操作進行處理,並輸出到外部數據源(或者變成Scala集合或標量)。
需要說明的是,RDD采用了惰性調用,即在RDD的執行過程中(如圖9-8所示),真正的計算發生在RDD的“行動”操作,對於“行動”之前的所有“轉換”操作,Spark只是記錄下“轉換”操作應用的一些基礎數據集以及RDD生成的軌跡,即相互之間的依賴關系,而不會觸發真正的計算。

技術分享圖片

技術分享圖片
圖9-8 Spark的轉換和行動操作

例如,在圖9-9中,從輸入中邏輯上生成A和C兩個RDD,經過一系列“轉換”操作,邏輯上生成了F(也是一個RDD),之所以說是邏輯上,是因為這時候計算並沒有發生,Spark只是記錄了RDD之間的生成和依賴關系。當F要進行輸出時,也就是當F進行“行動”操作的時候,Spark才會根據RDD的依賴關系生成DAG,並從起點開始真正的計算。

技術分享圖片
圖9-9 RDD執行過程的一個實例

上述這一系列處理稱為一個“血緣關系(Lineage)”,即DAG拓撲排序的結果。采用惰性調用,通過血緣關系連接起來的一系列RDD操作就可以實現管道化(pipeline),避免了多次轉換操作之間數據同步的等待,而且不用擔心有過多的中間數據,因為這些具有血緣關系的操作都管道化了,一個操作得到的結果不需要保存為中間數據,而是直接管道式地流入到下一個操作進行處理。同時,這種通過血緣關系把一系列操作進行管道化連接的設計方式,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性;相反,在MapReduce的設計中,為了盡可能地減少MapReduce過程,在單個MapReduce中會寫入過多復雜的邏輯。

例1:一個Spark的“Hello World”程序

這裏以一個“Hello World”入門級Spark程序來解釋RDD執行過程,這個程序的功能是讀取一個HDFS文件,計算出包含字符串“Hello World”的行數。

val sc= new SparkContext(“spark://localhost:7077”,”Hello World”, “YOUR_SPARK_HOME”,”YOUR_APP_JAR”) 
val fileRDD = sc.textFile(“hdfs://192.168.0.103:9000/examplefile”) 
val filterRDD = fileRDD.filter(_.contains(“Hello World”)) 
filterRDD.cache() 
filterRDD.count() 

可以看出,一個Spark應用程序,基本是基於RDD的一系列計算操作。第1行代碼用於創建SparkContext對象;第2行代碼從HDFS文件中讀取數據創建一個RDD;第3行代碼對fileRDD進行轉換操作得到一個新的RDD,即filterRDD;第4行代碼表示對filterRDD進行持久化,把它保存在內存或磁盤中(這裏采用cache接口把數據集保存在內存中),方便後續重復使用,當數據被反復訪問時(比如查詢一些熱點數據,或者運行叠代算法),這是非常有用的,而且通過cache()可以緩存非常大的數據集,支持跨越幾十甚至上百個節點;第5行代碼中的count()是一個行動操作,用於計算一個RDD集合中包含的元素個數。這個程序的執行過程如下:

  • 創建這個Spark程序的執行上下文,即創建SparkContext對象;

  • 從外部數據源(即HDFS文件)中讀取數據創建fileRDD對象;

  • 構建起fileRDD和filterRDD之間的依賴關系,形成DAG圖,這時候並沒有發生真正的計算,只是記錄轉換的軌跡;

  • 執行到第5行代碼時,count()是一個行動類型的操作,觸發真正的計算,開始實際執行從fileRDD到filterRDD的轉換操作,並把結果持久化到內存中,最後計算出filterRDD中包含的元素個數。

RDD特性

總體而言,Spark采用RDD以後能夠實現高效計算的主要原因如下:
(1)高效的容錯性。現有的分布式共享內存、鍵值存儲、內存數據庫等,為了實現容錯,必須在集群節點之間進行數據復制或者記錄日誌,也就是在節點之間會發生大量的數據傳輸,這對於數據密集型應用而言會帶來很大的開銷。在RDD的設計中,數據只讀,不可修改,如果需要修改數據,必須從父RDD轉換到子RDD,由此在不同RDD之間建立了血緣關系。所以,RDD是一種天生具有容錯機制的特殊集合,不需要通過數據冗余的方式(比如檢查點)實現容錯,而只需通過RDD父子依賴(血緣)關系重新計算得到丟失的分區來實現容錯,無需回滾整個系統,這樣就避免了數據復制的高開銷,而且重算過程可以在不同節點之間並行進行,實現了高效的容錯。此外,RDD提供的轉換操作都是一些粗粒度的操作(比如map、filter和join),RDD依賴關系只需要記錄這種粗粒度的轉換操作,而不需要記錄具體的數據和各種細粒度操作的日誌(比如對哪個數據項進行了修改),這就大大降低了數據密集型應用中的容錯開銷;
(2)中間結果持久化到內存。數據在內存中的多個RDD操作之間進行傳遞,不需要“落地”到磁盤上,避免了不必要的讀寫磁盤開銷;
(3)存放的數據可以是Java對象,避免了不必要的對象序列化和反序列化開銷。

RDD之間的依賴關系

RDD中不同的操作會使得不同RDD中的分區會產生不同的依賴。RDD中的依賴關系分為窄依賴(Narrow Dependency)與寬依賴(Wide Dependency),圖9-10展示了兩種依賴之間的區別。
窄依賴表現為一個父RDD的分區對應於一個子RDD的分區,或多個父RDD的分區對應於一個子RDD的分區;比如圖9-10(a)中,RDD1是RDD2的父RDD,RDD2是子RDD,RDD1的分區1,對應於RDD2的一個分區(即分區4);再比如,RDD6和RDD7都是RDD8的父RDD,RDD6中的分區(分區15)和RDD7中的分區(分區18),兩者都對應於RDD8中的一個分區(分區21)。
寬依賴則表現為存在一個父RDD的一個分區對應一個子RDD的多個分區。比如圖9-10(b)中,RDD9是RDD12的父RDD,RDD9中的分區24對應了RDD12中的兩個分區(即分區27和分區28)。
總體而言,如果父RDD的一個分區只被一個子RDD的一個分區所使用就是窄依賴,否則就是寬依賴。窄依賴典型的操作包括map、filter、union等,寬依賴典型的操作包括groupByKey、sortByKey等。對於連接(join)操作,可以分為兩種情況。
(1)對輸入進行協同劃分,屬於窄依賴(如圖9-10(a)所示)。所謂協同劃分(co-partitioned)是指多個父RDD的某一分區的所有“鍵(key)”,落在子RDD的同一個分區內,不會產生同一個父RDD的某一分區,落在子RDD的兩個分區的情況。
(2)對輸入做非協同劃分,屬於寬依賴,如圖9-10(b)所示。
對於窄依賴的RDD,可以以流水線的方式計算所有父分區,不會造成網絡之間的數據混合。對於寬依賴的RDD,則通常伴隨著Shuffle操作,即首先需要計算好所有父分區數據,然後在節點之間進行Shuffle。

技術分享圖片
圖9-10 窄依賴與寬依賴的區別

Spark的這種依賴關系設計,使其具有了天生的容錯性,大大加快了Spark的執行速度。因為,RDD數據集通過“血緣關系”記住了它是如何從其它RDD中演變過來的,血緣關系記錄的是粗顆粒度的轉換操作行為,當這個RDD的部分分區數據丟失時,它可以通過血緣關系獲取足夠的信息來重新運算和恢復丟失的數據分區,由此帶來了性能的提升。相對而言,在兩種依賴關系中,窄依賴的失敗恢復更為高效,它只需要根據父RDD分區重新計算丟失的分區即可(不需要重新計算所有分區),而且可以並行地在不同節點進行重新計算。而對於寬依賴而言,單個節點失效通常意味著重新計算過程會涉及多個父RDD分區,開銷較大。此外,Spark還提供了數據檢查點和記錄日誌,用於持久化中間RDD,從而使得在進行失敗恢復時不需要追溯到最開始的階段。在進行故障恢復時,Spark會對數據檢查點開銷和重新計算RDD分區的開銷進行比較,從而自動選擇最優的恢復策略。

階段的劃分

Spark通過分析各個RDD的依賴關系生成了DAG,再通過分析各個RDD中的分區之間的依賴關系來決定如何劃分階段,具體劃分方法是:在DAG中進行反向解析,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到當前的階段中;將窄依賴盡量劃分在同一個階段中,可以實現流水線計算(具體的階段劃分算法請參見AMP實驗室發表的論文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》)。例如,如圖9-11所示,假設從HDFS中讀入數據生成3個不同的RDD(即A、C和E),通過一系列轉換操作後再將計算結果保存回HDFS。對DAG進行解析時,在依賴圖中進行反向解析,由於從RDD A到RDD B的轉換以及從RDD B和F到RDD G的轉換,都屬於寬依賴,因此,在寬依賴處斷開後可以得到三個階段,即階段1、階段2和階段3。可以看出,在階段2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,比如,分區7通過map操作生成的分區9,可以不用等待分區8到分區9這個轉換操作的計算結束,而是繼續進行union操作,轉換得到分區13,這樣流水線執行大大提高了計算的效率。

技術分享圖片
圖9-11根據RDD分區的依賴關系劃分階段

由上述論述可知,把一個DAG圖劃分成多個“階段”以後,每個階段都代表了一組關聯的、相互之間沒有Shuffle依賴關系的任務組成的任務集合。每個任務集合會被提交給任務調度器(TaskScheduler)進行處理,由任務調度器將任務分發給Executor運行。

RDD運行過程

通過上述對RDD概念、依賴關系和階段劃分的介紹,結合之前介紹的Spark運行基本流程,這裏再總結一下RDD在Spark架構中的運行過程(如圖9-12所示):
(1)創建RDD對象;
(2)SparkContext負責計算RDD之間的依賴關系,構建DAG;
(3)DAGScheduler負責把DAG圖分解成多個階段,每個階段中包含了多個任務,每個任務會被任務調度器分發給各個工作節點(Worker Node)上的Executor去執行。

技術分享圖片
圖9-12 RDD在Spark中的運行過程

Spark學習(二)——RDD的設計與運行原理