1. 程式人生 > >Spark生態和RDD

Spark生態和RDD

1 Spark的生態

Spark生態

Spark Core中的基本概念

  • DAG(Directed Acyclic Graph), 有向無環圖。Spark Core提供了有向無環圖的分散式計算框架,並提供記憶體機制來支援多次迭代計算或者資料共享,大大減少了迭代計算之間讀取資料的開銷。
  • RDD(Resilient Distributed Dataset),它是一個分佈在一組節點中的只讀物件集合,這些集合是彈性的,如果資料集一部分丟失,則可以根據“血統”對他們進行重建,保證了資料的高容錯性。
  • RDD Partition,RDD分割槽。RDD Partition可以就近讀取分散式檔案系統中的資料塊到各個節點的記憶體。(是否類似Hadoop的Block,讀取資料時也有就近讀取的特性?

Spark Streaming

Spark Streaming是一個對實時資料流進行高吞吐、高容錯的流式處理系統,可以對多種資料來源(如:訊息佇列Kafka、Flume、ZeroMQ等)進行類似Map、Reduce和Join等複雜操作,並將結果儲存到外部檔案系統、資料庫或者實時儀表盤。Spark Streaming最大的優勢就是提供的處理引擎和RDD程式設計模型可以同時進行批處理和流處理

Spark Streaming

對於傳統流處理一次處理一條記錄的方式而言,Spark Streaming使用的是將流資料離散化處理(Discretized Streams)。流資料離散化的好處有三:① 實現了動態的負載均衡,②有利於快速的故障恢復,③將批處理、流處理和互動式工作有效的組合了起來。

spark streaming

Spark SQL

Spark SQL的前生是Shark,即Hive on Spark。Shark本質是通過Hive的HQL進行解析,將HQL翻譯成Spark上對應的RDD操作,然後通過Hive的Metadata獲取資料資料庫裡的表資訊,實際上為HDFS上的資料和檔案,最後由Shark獲取並放到Spark上運算。 2014年7月,Databricks宣佈終止對Shark的開發,將重點放到Spark SQL上。Spark SQL允許開發人員可以直接操作RDD,同時也可查詢在Hive上存放的外部資料,因此Spark SQL在使用SQL進行外部查詢的同時,也能進行更復雜的資料分析。 引入了SchemaRDD。SchemaRDD既可以從RDD轉換過來,也可以從Parquet檔案讀入,還可以使用HiveQL從Hive中獲取。

BlinkDB

BlinkDB是一個用於在海量資料上執行互動式SQL查詢的大規模並行查詢引擎,它允許使用者通過權衡資料精度提升查詢響應時間,其資料的精度被控制在允許的誤差範圍內。BlinkDB架構如下圖所示。

BlinkDB架構

MLBase/MLlib

MLBase是Spark生態系統中專注於機器學習的元件。MLBase主要有ML optimizer、MLI、MLlib、Spark/MLRuntime組成,其結構如下圖所示。 ML Base結構

GraphX

GraphX是Spark中用於圖和圖平行計算的API。

SparkR

R是遵循GNU協議的一款開源、免費的軟體,廣泛應用於統計計算和統計製圖,但是它只能單機執行。為了能夠使用R語言分析大規模分散式的資料,伯克利分銷AMP實驗室開發了SparkR,並在Spark1.4版本中加入了該元件。

Alluxio

Alluxio以記憶體為中心分散式儲存系統,從下圖可以看出, Alluxio主要有兩大功能,第一提供一個檔案系統層的抽象,統一檔案系統介面,橋接儲存系統和計算框架;第二通過記憶體實現對遠端資料的加速訪問。

Alluxio

2 Spark的相關術語和概念

RDD

① RDD的建立 RDD的建立有三種方法:① 來自一個記憶體中的物件集合;② 使用外部儲存器中的資料集;③ 對現有的RDD進行轉換。

② 轉換和動作 Spark為RDD提供兩大類操作:轉換(Transformation)和動作(Action)。轉換是從現有的RDD生成新的RDD,而動作則觸發對RDD的計算並對計算結果執行某種操作,要麼返回給使用者,要麼儲存到外部儲存器中。 **動作的效果是立杆見影的,但轉換是惰性的。**轉換過程不會觸發任何行動,而只有動作才會觸發之前的轉換和執行動作本身。 **區分動作和轉換的方法:**判斷操作的返回型別,若返回型別是RDD,則該操作是轉換,否則就是一個動作。

聚合轉換

  • reduceByKey(),它為鍵值對中的值重複應用一個二進位制函式,直至產生一個結果值,不能改變集合之後值的型別。
  • foldByKey(),和reduceByKey()的功能相似,但必須要提供一個預設的值;
  • aggregateByKey(),可以改變值的型別,如從Integer轉變成set。

③ 持久化

  • 將RDD快取到記憶體,且被快取的RDD只能由同一個應用的作業來讀取,如果要在應用之間共享資料,則能將資料持久化到外部儲存上了。
  • 持久化級別:MEMORY_ONLY,MEMORY_ONLY_SER(預設使用Java的內部序列化機制,Kryo序列化方法通常更好序列化方法),MEMORY_AND_DISK(如果資料操作一定的大小,將溢位到磁碟),MEMORY_AND_DISK_SER等。

④ 序列化 使用Spark時,要從兩個方面考慮序列化,分別是資料序列化(資料序列化一般使用)和函式序列化(使用java序列化機制);

⑤ RDD應用

迭代計算:如圖形處理、數值優化和機器學習等; 互動式SQL查詢:如SparkSQL; MapReduceRDD:通過提供MapReduce的超級,實現高效的執行MapReduce程式; 流式資料處理:Spark中提出的離散資料流D-Stream,將流式計算的執行,當作一些列短而確定的一系列批量計算的序列,並將狀態儲存在RDD中。

RDD之間的依賴關係

RDD中的依賴關係分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency),二者的主要區別在於是否包含Shuffle操作

Shuffle操作是指對Map輸出結果進行分割槽、排序和歸併等處理後並較給Reduce處理的過程。儘管Spark是基於記憶體的分散式計算框架,但是Spark的Shuffle操作也會把資料寫入到磁碟中。

窄依賴表現為一個父RDD的分割槽對應一個子RDD的分割槽,或多個父RDD分割槽對應一個子RDD分割槽。寬依賴表現為存在一個父RDD的分割槽對應一個子RDD的多個分割槽。

總結:如果父RDD的一個分割槽只被一個子RDD的分割槽所使用,該依賴就是窄依賴,否則就是寬依賴。

窄依賴典型的操作:map、filter、union、協同jion; 寬依賴典型的操作:groupByKey、sortByKey、非協同jion;

Spark通過分析各個RDD之間的依賴關係生成了DAG,DAG提交給DAGScheduler,DAGScheduler再通過分析各個RDD中分割槽之間的依賴關係來決定如何劃分階段,每個階段包括一個或者多個任務,這些任務形成任務集,最後將該任務集合提交給TaskScheduler執行。Phase的具體劃分方法是:在DAG中進行反向解析,遇到寬依賴就斷開(因為只有窄依賴可以實現流水線優化),遇到窄依賴就把當前RDD加入到當前的階段中。參考論文:A Fault-Tolerant Abstraction for In-Memory Cluster Computing

DAGScheduler記錄哪些RDD被存入磁碟等物化動作,同時要尋求任務的最優排程、監控執行排程階段過程等。如果某個排程階段執行失敗,則需要重新提交該排程階段。

其他

  • Application/Job/Stage/Task Spark的Job是由任意的多階段(Stages)有向無環圖(DAG)構成,每個Stage相當於MapReduce的Map階段或者Reduce階段。這些階段又被Spark執行環境分解為多個任務(Task),任務並行執行在分佈於叢集中的RDD分割槽上,就像MapReduce的任務一樣。 Spark的Job始終執行在應用(application)上下文(SparkContext)中,它提供了RDD分組以及共享變數。一個應用可以序列或者並行的執行多個作業,併為這些作業提供訪問由先前作業所快取的RDD。
  • SparkContext,Spark執行的上下文;
  • SparkConf

參考

  • 《圖解Spark-核心技術與案例實戰》
  • 《Hadoop權威指南》
  • 《Spark程式設計基礎 scala版》