1. 程式人生 > >Spark基礎知識整理&入門

Spark基礎知識整理&入門

 

       Apache Spark是一種快速通用的叢集計算系統。 它提供Java,Scala,Python和R中的高階API,以及支援通用執行圖的優化引擎。 它還支援一組豐富的高階工具,包括用於SQL和結構化資料處理的Spark SQL,用於機器學習的MLlib,用於圖形處理的GraphX和Spark Streaming。

Spark優點:

  • 減少磁碟I/O:隨著實時大資料應用越來越多,Hadoop作為離線的高吞吐、低響應框架已不能滿足這類需求。HadoopMapReduce的map端將中間輸出和結果儲存在磁碟中,reduce端又需要從磁碟讀寫中間結果,勢必造成磁碟IO成為瓶頸。Spark允許將map端的中間輸出和結果儲存在記憶體中,reduce端在拉取中間結果時避免了大量的磁碟I/O。Hadoop Yarn中的ApplicationMaster申請到Container後,具體的任務需要利用NodeManager從HDFS的不同節點下載任務所需的資源(如Jar包),這也增加了磁碟I/O。Spark將應用程式上傳的資原始檔緩衝到Driver本地檔案服務的記憶體中,當Executor執行任務時直接從Driver的記憶體中讀取,也節省了大量的磁碟I/O。
  • 增加並行度:由於將中間結果寫到磁碟與從磁碟讀取中間結果屬於不同的環節,Hadoop將它們簡單的通過序列執行銜接起來。Spark把不同的環節抽象為Stage,允許多個Stage既可以序列執行,又可以並行執行。
  • 避免重新計算:當Stage中某個分割槽的Task執行失敗後,會重新對此Stage排程,但在重新排程的時候會過濾已經執行成功的分割槽任務,所以不會造成重複計算和資源浪費。
  • 可選的Shuffle排序:HadoopMapReduce在Shuffle之前有著固定的排序操作,而Spark則可以根據不同場景選擇在map端排序或者reduce端排序。
  • 靈活的記憶體管理策略:Spark將記憶體分為堆上的儲存記憶體、堆外的儲存記憶體、堆上的執行記憶體、堆外的執行記憶體4個部分。Spark既提供了執行記憶體和儲存記憶體之間是固定邊界的實現,又提供了執行記憶體和儲存記憶體之間是“軟”邊界的實現。Spark預設使用“軟”邊界的實現,執行記憶體或儲存記憶體中的任意一方在資源不足時都可以借用另一方的記憶體,最大限度的提高資源的利用率,減少對資源的浪費。Spark由於對記憶體使用的偏好,記憶體資源的多寡和使用率就顯得尤為重要,為此Spark的記憶體管理器提供的Tungsten實現了一種與作業系統的記憶體Page非常相似的資料結構,用於直接操作作業系統記憶體,節省了建立的Java物件在堆中佔用的記憶體,使得Spark對記憶體的使用效率更加接近硬體。Spark會給每個Task分配一個配套的任務記憶體管理器,對Task粒度的記憶體進行管理。Task的記憶體可以被多個內部的消費者消費,任務記憶體管理器對每個消費者進行Task記憶體的分配與管理,因此Spark對記憶體有著更細粒度的管理。

基於以上所列舉的優化,Spark官網聲稱效能比Hadoop快100倍,如圖3所示。即便是記憶體不足需要磁碟I/O時,其速度也是Hadoop的10倍以上。

 Hadoop與Spark執行邏輯迴歸時間比較

Spark還有其他一些特點。

  • 檢查點支援:Spark的RDD之間維護了血緣關係(lineage),一旦某個RDD失敗了,則可以由父RDD重建。雖然lineage可用於錯誤後RDD的恢復,但對於很長的lineage來說,恢復過程非常耗時。如果應用啟用了檢查點,那麼在Stage中的Task都執行成功後,SparkContext將把RDD計算的結果儲存到檢查點,這樣當某個RDD執行失敗後,在由父RDD重建時就不需要重新計算,而直接從檢查點恢復資料。
  • 易於使用。Spark現在支援Java、Scala、Python和R等語言編寫應用程式,大大降低了使用者的門檻。自帶了80多個高等級操作符,允許在Scala,Python,R的shell中進行互動式查詢。
  • 支援互動式:Spark使用Scala開發,並藉助於Scala類庫中的Iloop實現互動式shell,提供對REPL(Read-eval-print-loop)的實現。
  • 支援SQL查詢。在資料查詢方面,Spark支援SQL及Hive SQL,這極大的方便了傳統SQL開發和資料倉庫的使用者。
  • 支援流式計算:與MapReduce只能處理離線資料相比,Spark還支援實時的流計算。Spark依賴SparkStreaming對資料進行實時的處理,其流式處理能力還要強於Storm。
  • 可用性高。Spark自身實現了Standalone部署模式,此模式下的Master可以有多個,解決了單點故障問題。Spark也完全支援使用外部的部署模式,比如YARN、Mesos、EC2等。
  • 豐富的資料來源支援:Spark除了可以訪問作業系統自身的檔案系統和HDFS,還可以訪問Kafka、Socket、Cassandra、HBase、Hive、Alluxio(Tachyon)以及任何Hadoop的資料來源。這極大地方便了已經使用HDFS、HBase的使用者順利遷移到Spark。
  • 豐富的檔案格式支援:Spark支援文字檔案格式、Csv檔案格式、Json檔案格式、Orc檔案格式、Parquet檔案格式、Libsvm檔案格式,也有利於Spark與其他資料處理平臺的對接。

 

基本概念

  要想對Spark有整體性的瞭解,推薦讀者閱讀Matei Zaharia的Spark論文。此處筆者先介紹Spark中的一些概念:

  • RDD(resillient distributed dataset):彈性分散式資料集。Spark應用程式通過使用Spark的轉換API可以將RDD封裝為一系列具有血緣關係的RDD,也就是DAG。只有通過Spark的動作API才會將RDD及其DAG提交到DAGScheduler。RDD的祖先一定是一個跟資料來源相關的RDD,負責從資料來源迭代讀取資料。
  • DAG(Directed Acycle graph):有向無環圖。在圖論中,如果一個有向圖無法從某個頂點出發經過若干條邊回到該點,則這個圖是一個有向無環圖(DAG圖)。Spark使用DAG來反映各RDD之間的依賴或血緣關係。
  • Partition:資料分割槽。即一個RDD的資料可以劃分為多少個分割槽。Spark根據Partition的數量來確定Task的數量。
  • NarrowDependency:窄依賴。即子RDD依賴於父RDD中固定的Partition。NarrowDependency分為OneToOneDependency和RangeDependency兩種。
  • ShuffleDependency:Shuffle依賴,也稱為寬依賴。即子RDD對父RDD中的所有Partition都可能產生依賴。子RDD對父RDD各個Partition的依賴將取決於分割槽計算器(Partitioner)的演算法。
  • Job:使用者提交的作業。當RDD及其DAG被提交給DAGScheduler排程後,DAGScheduler會將所有RDD中的轉換及動作視為一個Job。一個Job由一到多個Task組成。
  • Stage:Job的執行階段。DAGScheduler按照ShuffleDependency作為Stage的劃分節點對RDD的DAG進行Stage劃分(上游的Stage將為ShuffleMapStage)。因此一個Job可能被劃分為一到多個Stage。Stage分為ShuffleMapStage和ResultStage兩種。
  • Task:具體執行任務。一個Job在每個Stage內都會按照RDD的Partition 數量,建立多個Task。Task分為ShuffleMapTask和ResultTask兩種。ShuffleMapStage中的Task為ShuffleMapTask,而ResultStage中的Task為ResultTask。ShuffleMapTask和ResultTask類似於Hadoop中的 Map任務和Reduce任務。

Scala與Java的比較

  目前越來越多的語言可以執行在Java虛擬機器上,Java平臺上的多語言混合程式設計正成為一種潮流。在混合程式設計模式下可以充分利用每種語言的特點和優勢,以便更好地完成功能。Spark同時選擇了Scala和Java作為開發語言,也是為了充分利用二者各自的優勢。表1對這兩種語言進行比較。

表1   Scala與Java的比較

 

Scala

Java

語言型別

面向函式為主,兼有面向物件

面向物件(Java8也增加了lambda函式程式設計)

簡潔性

非常簡潔

不簡潔

型別推斷

豐富的型別推斷,例如深度和鏈式的型別推斷、 duck type 、隱式型別轉換等,但也因此增加了編譯時長

少量的型別推斷

可讀性

一般,豐富的語法糖導致的各種奇幻用法,例如方法簽名、隱式轉換

學習成本

較高

一般

語言特性

非常豐富的語法糖和更現代的語言特性,例如 Option 、模式匹配、使用空格的方法呼叫

豐富

併發程式設計

使用Actor的訊息模型

使用阻塞、鎖、阻塞佇列等

注意:雖然Actor是Scala語言最初進行推廣時,最吸引人的特性之一,但是隨著Akka更加強大的Actor類庫的出現,Scala已經在官方網站宣佈廢棄Scala自身的Actor程式設計模型,轉而全面擁抱Akka提供的Actor程式設計模型。與此同時,從Spark2.0.0版本開始,Spark卻放棄了使用Akka,轉而使用Netty實現了自己的Rpc框架。遙想當年Scala“鼓吹”Actor程式設計模型優於Java的同步程式設計模型時,又有誰會想到如今這種場面呢?

  Scala作為函數語言程式設計的代表,天生適合並行執行,如果用Java語言實現相同的功能會顯得非常臃腫。很多介紹Spark的新聞或文章經常以Spark核心程式碼行數少或API精煉等內容作為宣傳的“法器”,這應該也是選擇Scala的原因之一。另一方面,由於函數語言程式設計更接近計算機思維,因此便於通過演算法從大資料中建模,這也更符合Spark作為大資料框架的理念吧!

  由於Java適合伺服器、中介軟體開發,所以Spark使用Java更多的是開發底層的基礎設施或中介軟體。

模組設計

整個Spark主要由以下模組組成:

  • Spark Core:Spark的核心功能實現,包括:基礎設施、SparkContext(Application通過SparkContext提交)、Spark執行環境(SparkEnv)、儲存體系、排程系統、計算引擎、部署模式、任務提交與執行等。
  • Spark SQL:提供SQL處理能力,便於熟悉關係型資料庫操作的工程師進行互動查詢。此外,還為熟悉Hive開發的使用者提供了對Hive SQL的支援。
  • Spark Streaming:提供流式計算處理能力,目前支援ApacheKafka、Apache Flume、Amazon Kinesis和簡單的TCP套接字等資料來源。在早期的Spark版本中還自帶對Twitter、MQTT、ZeroMQ等的支援,現在使用者想要支援這些工具必須自己開發實現。此外,Spark Streaming還提供視窗操作用於對一定週期內的流資料進行處理。
  • GraphX:基於圖論,實現的支援分散式的圖計算處理框架。GraphX的基礎是點、邊等圖論的理論。GraphX 基於圖計算的Pregel模型提供了多種多樣的Pregel API,這些Pregel API可以解決圖計算中的常見問題。
  • MLlib:Spark提供的機器學習庫。MLlib提供了機器學習相關的統計、分類、迴歸等領域的多種演算法實現。其一致的API介面大大降低了使用者的學習成本。

Spark SQL、Spark Streaming、GraphX、MLlib的能力都是建立在核心引擎之上,如圖4。

圖4   Spark各模組依賴關係

Spark核心功能

  Spark Core中提供了Spark最基礎與最核心的功能,主要包括:

  • 基礎設施:在Spark中有很多基礎設施,被Spark中的各種元件廣泛使用。這些基礎設施包括Spark配置(SparkConf)、Spark內建的Rpc框架(在早期Spark版本中Spark使用的是Akka)、事件匯流排(ListenerBus)、度量系統。SparkConf用於管理Spark應用程式的各種配置資訊。Spark內建的Rpc框架使用Netty實現,有同步和非同步的多種實現,Spark各個元件間的通訊都依賴於此Rpc框架。如果說Rpc框架是跨機器節點不同元件間的通訊設施,那麼事件匯流排就是SparkContext內部各個元件間使用事件——監聽器模式非同步呼叫的實現。度量系統由Spark中的多種度量源(Source)和多種度量輸出(Sink)構成,完成對整個Spark叢集中各個元件執行期狀態的監控。
  • SparkContext:通常而言,使用者開發的Spark應用程式(Application)的提交與執行都離不開SparkContext的支援。在正式提交Application之前,首先需要初始化SparkContext。SparkContext隱藏了網路通訊、分散式部署、訊息通訊、儲存體系、計算引擎、度量系統、檔案服務、Web UI等內容,應用程式開發者只需要使用SparkContext提供的API完成功能開發。
  • SparkEnv:Spark執行環境(SparkEnv)是Spark中的Task執行所必須的元件。SparkEnv內部封裝了Rpc環境(RpcEnv)、序列化管理器、廣播管理器(BroadcastManager)、map任務輸出跟蹤器(MapOutputTracker)、儲存體系、度量系統(MetricsSystem)、輸出提交協調器(OutputCommitCoordinator)等Task執行所需的各種元件。
  • 儲存體系:Spark優先考慮使用各節點的記憶體作為儲存,當記憶體不足時才會考慮使用磁碟,這極大地減少了磁碟I/O,提升了任務執行的效率,使得Spark適用於實時計算、迭代計算、流式計算等場景。在實際場景中,有些Task是儲存密集型的,有些則是計算密集型的,所以有時候會造成儲存空間很空閒,而計算空間的資源又很緊張。Spark的記憶體儲存空間與執行儲存空間之間的邊界可以是“軟”邊界,因此資源緊張的一方可以借用另一方的空間,這既可以有效利用資源,又可以提高Task的執行效率。此外,Spark的記憶體空間還提供了Tungsten的實現,直接操作作業系統的記憶體。由於Tungsten省去了在堆內分配Java物件,因此能更加有效的利用系統的記憶體資源,並且因為直接作業系統記憶體,空間的分配和釋放也更迅速。在Spark早期版本還使用了以記憶體為中心的高容錯的分散式檔案系統Alluxio(Tachyon)供使用者進行選擇。Alluxio能夠為Spark提供可靠的記憶體級的檔案共享服務。
  • 排程系統:排程系統主要由DAGScheduler和TaskScheduler組成,它們都內建在SparkContext中。DAGScheduler負責建立Job、將DAG中的RDD劃分到不同的Stage、給Stage建立對應的Task、批量提交Task等功能。TaskScheduler負責按照FIFO或者FAIR等排程演算法對批量Task進行排程;為Task分配資源;將Task傳送到叢集管理器分配給當前應用的Executor上由Executor負責執行等工作。現如今,Spark增加了SparkSession和DataFrame等新的API,SparkSession底層實際依然依賴於SparkContext。
  • 計算引擎:計算引擎由記憶體管理器(MemoryManager)、Tungsten、任務記憶體管理器(TaskMemoryManager)、Task、外部排序器(ExternalSorter)、Shuffle管理器(ShuffleManager)等組成。MemoryManager除了對儲存體系中的儲存記憶體提供支援和管理,還外計算引擎中的執行記憶體提供支援和管理。Tungsten除用於儲存外,也可以用於計算或執行。TaskMemoryManager對分配給單個Task的記憶體資源進行更細粒度的管理和控制。ExternalSorter用於在map端或reduce端對ShuffleMapTask計算得到的中間結果進行排序、聚合等操作。ShuffleManager用於將各個分割槽對應的ShuffleMapTask產生的中間結果持久化到磁碟,並在reduce端按照分割槽遠端拉取ShuffleMapTask產生的中間結果。

Spark擴充套件功能

  為了擴大應用範圍,Spark陸續增加了一些擴充套件功能,主要包括:

  • Spark SQL:由於SQL具有普及率高、學習成本低等特點,為了擴大Spark的應用面,因此增加了對SQL及Hive的支援。Spark SQL的過程可以總結為:首先使用SQL語句解析器(SqlParser)將SQL轉換為語法樹(Tree),並且使用規則執行器(RuleExecutor)將一系列規則(Rule)應用到語法樹,最終生成物理執行計劃並執行的過程。其中,規則包括語法分析器(Analyzer)和優化器(Optimizer)。Hive的執行過程與SQL類似。
  • Spark Streaming:Spark Streaming與Apache Storm類似,也用於流式計算。SparkStreaming支援Kafka、Flume、Kinesis和簡單的TCP套接字等多種資料輸入源。輸入流接收器(Receiver)負責接入資料,是接入資料流的介面規範。Dstream是Spark Streaming中所有資料流的抽象,Dstream可以被組織為DStreamGraph。Dstream本質上由一系列連續的RDD組成。
  • GraphX:Spark提供的分散式圖計算框架。GraphX主要遵循整體同步平行計算模式(Bulk Synchronous Parallell,簡稱BSP)下的Pregel模型實現。GraphX提供了對圖的抽象Graph,Graph由頂點(Vertex)、邊(Edge)及繼承了Edge的EdgeTriplet(添加了srcAttr和dstAttr用來儲存源頂點和目的頂點的屬性)三種結構組成。GraphX目前已經封裝了最短路徑、網頁排名、連線元件、三角關係統計等演算法的實現,使用者可以選擇使用。
  • MLlib:Spark提供的機器學習框架。機器學習是一門涉及概率論、統計學、逼近論、凸分析、演算法複雜度理論等多領域的交叉學科。MLlib目前已經提供了基礎統計、分類、迴歸、決策樹、隨機森林、樸素貝葉斯、保序迴歸、協同過濾、聚類、維數縮減、特徵提取與轉型、頻繁模式挖掘、預言模型標記語言、管道等多種數理統計、概率論、資料探勘方面的數學演算法。

 

Spark模型設計

1. Spark程式設計模型

正如Hadoop在介紹MapReduce程式設計模型時選擇word count的例子,並且使用圖形來說明一樣,筆者對於Spark程式設計模型也選擇用圖形展現。

Spark 應用程式從編寫到提交、執行、輸出的整個過程如圖5所示。

 

圖5   程式碼執行過程

圖5中描述了Spark程式設計模型的關鍵環節的步驟如下。

1)使用者使用SparkContext提供的API(常用的有textFile、sequenceFile、runJob、stop等)編寫Driver application程式。此外,SparkSession、DataFrame、SQLContext、HiveContext及StreamingContext都對SparkContext進行了封裝,並提供了DataFrame、SQL、Hive及流式計算相關的API。

2)使用SparkContext提交的使用者應用程式,首先會通過RpcEnv向叢集管理器(Cluster Manager)註冊應用(Application)並且告知叢集管理器需要的資源數量。叢集管理器根據Application的需求,給Application分配Executor資源,並在Worker上啟動CoarseGrainedExecutorBackend程序(CoarseGrainedExecutorBackend程序內部將建立Executor)。Executor所在的CoarseGrainedExecutorBackend程序在啟動的過程中將通過RpcEnv直接向Driver註冊Executor的資源資訊,TaskScheduler將儲存已經分配給應用的Executor資源的地址、大小等相關資訊。然後,SparkContext根據各種轉換API,構建RDD之間的血緣關係(lineage)和DAG,RDD構成的DAG將最終提交給DAGScheduler。DAGScheduler給提交的DAG建立Job並根據RDD的依賴性質將DAG劃分為不同的Stage。DAGScheduler根據Stage內RDD的Partition數量建立多個Task並批量提交給TaskScheduler。TaskScheduler對批量的Task按照FIFO或FAIR排程演算法進行排程,然後給Task分配Executor資源,最後將Task傳送給Executor由Executor執行。此外,SparkContext還會在RDD轉換開始之前使用BlockManager和BroadcastManager將任務的Hadoop配置進行廣播。

3)叢集管理器(Cluster Manager)會根據應用的需求,給應用分配資源,即將具體任務分配到不同Worker節點上的多個Executor來處理任務的執行。Standalone、YARN、Mesos、EC2等都可以作為Spark的叢集管理器。

4)Task在執行的過程中需要對一些資料(例如中間結果、檢查點等)進行持久化,Spark支援選擇HDFS 、Amazon S3、Alluxio(原名叫Tachyon)等作為儲存。

2.RDD計算模型

RDD可以看做是對各種資料計算模型的統一抽象,Spark的計算過程主要是RDD的迭代計算過程,如圖6所示。RDD的迭代計算過程非常類似於管道。分割槽數量取決於Partition數量的設定,每個分割槽的資料只會在一個Task中計算。所有分割槽可以在多個機器節點的Executor上並行執行。

 

圖6   RDD計算模型

圖6只是簡單的從分割槽的角度將RDD的計算看作是管道,如果從RDD的血緣關係、Stage劃分的角度來看,由RDD構成的DAG經過DAGScheduler排程後,將變成圖7所示的樣子。

圖7  DAGScheduler對由RDD構成的DAG進行排程

圖7中共展示了A、B、C、D、E、F、G一共7個RDD。每個RDD中的小方塊代表一個分割槽,將會有一個Task處理此分割槽的資料。RDD A經過groupByKey轉換後得到RDD B。RDD C經過map轉換後得到RDD D。RDD D和RDD E經過union轉換後得到RDD F。RDD B和RDD F經過join轉換後得到RDD G。從圖中可以看到map和union生成的RDD與其上游RDD之間的依賴是NarrowDependency,而groupByKey和join生成的RDD與其上游的RDD之間的依賴是ShuffleDependency。由於DAGScheduler按照ShuffleDependency作為Stage的劃分的依據,因此A被劃入了ShuffleMapStage 1;C、D、E、F被劃入了ShuffleMapStage 2;B和G被劃入了ResultStage 3。

Spark基本架構

從叢集部署的角度來看,Spark叢集由叢集管理器(Cluster Manager)、工作節點(Worker)、執行器(Executor)、驅動器(Driver)、應用程式(Application)等部分組成,它們之間的整體關係如圖8所示。

 

圖8   Spark基本架構圖

下面結合圖8對這些組成部分以及它們之間的關係進行介紹。

(1)Cluster Manager

Spark的叢集管理器,主要負責對整個叢集資源的分配與管理。Cluster Manager在Yarn部署模式下為ResourceManager;在Mesos部署模式下為Mesos master;在Standalone部署模式下為Master。Cluster Manager分配的資源屬於一級分配,它將各個Worker上的記憶體、CPU等資源分配給Application,但是並不負責對Executor的資源分配。Standalone部署模式下的Master會直接給Application分配記憶體、CPU以及Executor等資源。目前,Standalone、YARN、Mesos、EC2等都可以作為Spark的叢集管理器。

注意:這裡提到了部署模式中的Standalone、Yarn、Mesos等模式,讀者暫時知道這些內容即可,本書將在第9章對它們詳細介紹。

(2)Worker

Spark的工作節點。在Yarn部署模式下實際由NodeManager替代。Worker節點主要負責以下工作:將自己的記憶體、CPU等資源通過註冊機制告知Cluster Manager;建立Executor;將資源和任務進一步分配給Executor;同步資源資訊、Executor狀態資訊給Cluster Manager等。在Standalone部署模式下,Master將Worker上的記憶體、CPU以及Executor等資源分配給Application後,將命令Worker啟動CoarseGrainedExecutorBackend程序(此程序會建立Executor例項)。

(3)Executor

執行計算任務的一線元件。主要負責任務的執行以及與Worker、Driver的資訊同步。

(4)Driver

Application的驅動程式,Application通過Driver與Cluster Manager、Executor進行通訊。Driver可以執行在Application中,也可以由Application提交給Cluster Manager並由Cluster Manager安排Worker執行。

(4)Application

使用者使用Spark提供的API編寫的應用程式,Application通過Spark API將進行RDD的轉換和DAG的構建,並通過Driver將Application註冊到Cluster Manager。Cluster Manager將會根據Application的資源需求,通過一級分配將Executor、記憶體、CPU等資源分配給Application。Driver通過二級分配將Executor等資源分配給每一個任務,Application最後通過Driver告訴Executor執行任務。

 

來源: 《Spark核心設計的藝術 架構設計與實現》 --耿嘉安