1. 程式人生 > >Spark學習(一)Spark介紹

Spark學習(一)Spark介紹

一、什麼是spark

spark是基於記憶體計算的大資料平行計算框架,也是hadoop中的mapreduce的替代方案,但和mapreduce又有許多不同。      

Spark包含了大資料領域常見的各種計算框架:比如Spark Core用於離線計算,Spark SQL用於互動式查詢,Spark Streaming用於實時流式計算,Spark MLlib用於機器學習,Spark GraphX用於圖計算。
      Spark主要用於大資料的計算,而Hadoop以後主要用於大資料的儲存(比如HDFS、Hive、HBase等),以及資源排程(Yarn)。
      Spark+Hadoop的組合,是未來大資料領域最熱門的組合,也是最有前景的組合!

二、對比hadoop

1、最明顯的一點,spark要比mapreduce快的多,官網的資料是,spark在硬碟上執行要比mapreduce快上十倍以上,而在記憶體中則是快到百倍,主要原因有以下幾點(個人總結,也許不夠全面)

  • spark是基於記憶體計算,而hadoop基於磁碟。很顯然spark速度更快且更適合迭代運算。
  • Spark對分散的資料集進行抽樣,創新地提出RDD(ResilientDistributedDataset)的概念,所有的統計分析任務被翻譯成對RDD的基本操作組成的有向無環圖(DAG)。RDD可以被駐留在RAM中,往後的任務可以直接讀取RAM中的資料;同時分析DAG中任務之間的依賴性可以把相鄰的任務合併,從而減少了大量不準確的結果輸出,極大減少了HarddiskI/O,使複雜資料分析任務更高效。從這個推算,如果任務夠複雜,Spark比Map/Reduce快一到兩倍。
  • mr基於程序,每個task的啟動和結束都會開啟或關閉程序,耗費了大量的時間。而spark基於執行緒,每次只有啟動excutor的時候會啟動程序,task都是線上程中進行。
  • spark採用DAG(有向無環圖,後面會介紹),避免了冗餘的mr操作。

2、hadoop的底層是java編寫的,而spark是用scale編寫的,雖然兩者都提供了豐富的api可以支援不同的語言,但想深入學習一下spark最好還是使用scale。

3、相比hadoop 的一個mr程式需要寫map,reduce,driver等一大堆類和方法,spark程式則要精簡的多,拿wordcount來說:

object WordCount {
  
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WordCount");
    val wordCounts= new SparkContext(conf)
                    .textFile("hdfs://spark1:9000/spark.txt", 1)
                    .flatMap { line => line.split(" ") }   
                    .map { word => (word, 1) }   
                    .reduceByKey { _ + _ }
                    .foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))  
  }
  
}

只需兩行程式碼(雖然第二行略長了一些),就完成了一個wordCount,要知道寫一個mr的wordcount得多麻煩。而這也是因為spark的RDD對許多操作進行了抽象,使得我們更方便使用。

4、相比hadoop,spark較為不穩定,由於大量資料被快取在記憶體中,Java回收垃圾緩慢的情況嚴重,導致Spark效能不穩定,在複雜場景中SQL的效能甚至不如現有的Map/Reduce。也是因為資料存在記憶體中的原因,當資料量過大超出記憶體空間時會產生非常可怕的後果。所以在處理大量資料方面還是使用hadoop較為合理

三、生態系統

Spark生態圈也稱為BDAS(伯克利資料分析棧),是伯克利APMLab實驗室打造的,力圖在演算法(Algorithms)、機器(Machines)、人(People)之間通過大規模整合來展現大資料應用的一個平臺。伯克利AMPLab運用大資料、雲端計算、通訊等各種資源以及各種靈活的技術方案,對海量不透明的資料進行甄別並轉化為有用的資訊,以供人們更好的理解世界。該生態圈已經涉及到機器學習、資料探勘、資料庫、資訊檢索、自然語言處理和語音識別等多個領域。

Spark生態圈以Spark Core為核心,從HDFS、Amazon S3和HBase等持久層讀取資料,以MESS、YARN和自身攜帶的Standalone為資源管理器排程Job完成Spark應用程式的計算。 這些應用程式可以來自於不同的元件,如Spark Shell/Spark Submit的批處理、Spark Streaming的實時處理應用、Spark SQL的即時查詢、BlinkDB的權衡查詢、MLlib/MLbase的機器學習、GraphX的圖處理和SparkR的數學計算等等。

1 Spark Core

前面介紹了Spark Core的基本情況,以下總結一下Spark核心架構:

l  提供了有向無環圖(DAG)的分散式平行計算框架,並提供Cache機制來支援多次迭代計算或者資料共享,大大減少迭代計算之間讀取資料局的開銷,這對於需要進行多次迭代的資料探勘和分析效能有很大提升

l  在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分佈在一組節點中的只讀物件集合,這些集合是彈性的,如果資料集一部分丟失,則可以根據“血統”對它們進行重建,保證了資料的高容錯性;

l  移動計算而非移動資料,RDD Partition可以就近讀取分散式檔案系統中的資料塊到各個節點記憶體中進行計算

l  使用多執行緒池模型來減少task啟動開稍

l  採用容錯的、高可伸縮性的akka作為通訊框架

2.2 SparkStreaming

SparkStreaming是一個對實時資料流進行高通量、容錯處理的流式處理系統,可以對多種資料來源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等複雜操作,並將結果儲存到外部檔案系統、資料庫或應用到實時儀表盤。

Spark Streaming構架

l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark Core,也就是把Spark Streaming的輸入資料按照batch size(如1秒)分成一段一段的資料(Discretized Stream),每一段資料都轉換成Spark中的RDD(Resilient Distributed Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果儲存在記憶體中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者儲存到外部裝置。下圖顯示了Spark Streaming的整個流程。

l容錯性:對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分散式可重算的資料集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入資料是可容錯的,那麼任意一個RDD的分割槽(Partition)出錯或不可用,都是可以利用原始輸入資料通過轉換操作而重新算出的。  

對於Spark Streaming來說,其RDD的傳承關係如下圖所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連線的,由於Spark Streaming輸入資料可以來自於磁碟,例如HDFS(多份拷貝)或是來自於網路的資料流(Spark Streaming會將網路輸入資料的每一個數據流拷貝兩份到其他的機器)都能保證容錯性,所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

l實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段資料的處理都會經過Spark DAG圖分解以及Spark的任務集的排程過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

l擴充套件性與吞吐量:Spark目前在EC2上已能夠線性擴充套件到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的資料量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。

2.3 Spark SQL

Shark是SparkSQL的前身,它釋出於3年前,那個時候Hive可以說是SQL on Hadoop的唯一選擇,負責將SQL編譯成可擴充套件的MapReduce作業,鑑於Hive的效能以及與Spark的相容,Shark專案由此而生。

Shark即Hive on Spark,本質上是通過Hive的HQL解析,把HQL翻譯成Spark上的RDD操作,然後通過Hive的metadata獲取資料庫裡的表資訊,實際HDFS上的資料和檔案,會由Shark獲取並放到Spark上運算。Shark的最大特性就是快和與Hive的完全相容,且可以在shell模式下使用rdd2sql()這樣的API,把HQL得到的結果集,繼續在scala環境下運算,支援自己編寫簡單的機器學習或簡單分析處理函式,對HQL結果進一步分析計算。

在2014年7月1日的Spark Summit上,Databricks宣佈終止對Shark的開發,將重點放到Spark SQL上。Databricks表示,Spark SQL將涵蓋Shark的所有特性,使用者可以從Shark 0.9進行無縫的升級。在會議上,Databricks表示,Shark更多是對Hive的改造,替換了Hive的物理執行引擎,因此會有一個很快的速度。然而,不容忽視的是,Shark繼承了大量的Hive程式碼,因此給優化和維護帶來了大量的麻煩。隨著效能優化和先進分析整合的進一步加深,基於MapReduce設計的部分無疑成為了整個專案的瓶頸。因此,為了更好的發展,給使用者提供一個更好的體驗,Databricks宣佈終止Shark專案,從而將更多的精力放到Spark SQL上。

Spark SQL允許開發人員直接處理RDD,同時也可查詢例如在 Apache Hive上存在的外部資料。Spark SQL的一個重要特點是其能夠統一處理關係表和RDD,使得開發人員可以輕鬆地使用SQL命令進行外部查詢,同時進行更復雜的資料分析。除了Spark SQL外,Michael還談到Catalyst優化框架,它允許Spark SQL自動修改查詢方案,使SQL更有效地執行。

還有Shark的作者是來自中國的博士生辛湜(Reynold Xin),也是Spark的核心成員,具體資訊可以看他的專訪 http://www.csdn.net/article/2013-04-26/2815057-Spark-Reynold

Spark SQL的特點:

l引入了新的RDD型別SchemaRDD,可以象傳統資料庫定義表一樣來定義SchemaRDD,SchemaRDD由定義了列資料型別的行物件構成。SchemaRDD可以從RDD轉換過來,也可以從Parquet檔案讀入,也可以使用HiveQL從Hive中獲取。

l內嵌了Catalyst查詢優化框架,在把SQL解析成邏輯執行計劃之後,利用Catalyst包裡的一些類和介面,執行了一些簡單的執行計劃優化,最後變成RDD的計算

l在應用程式中可以混合使用不同來源的資料,如可以將來自HiveQL的資料和來自SQL的資料進行Join操作。

Shark的出現使得SQL-on-Hadoop的效能比Hive有了10-100倍的提高,  那麼,擺脫了Hive的限制,SparkSQL的效能又有怎麼樣的表現呢?雖然沒有Shark相對於Hive那樣矚目地效能提升,但也表現得非常優異,如下圖所示:

為什麼sparkSQL的效能會得到怎麼大的提升呢?主要sparkSQL在下面幾點做了優化:

1. 記憶體列儲存(In-Memory Columnar Storage) sparkSQL的表資料在記憶體中儲存不是採用原生態的JVM物件儲存方式,而是採用記憶體列儲存;

2. 位元組碼生成技術(Bytecode Generation) Spark1.1.0在Catalyst模組的expressions增加了codegen模組,使用動態位元組碼生成技術,對匹配的表示式採用特定的程式碼動態編譯。另外對SQL表示式都作了CG優化, CG優化的實現主要還是依靠Scala2.10的執行時放射機制(runtime reflection);

3. Scala程式碼優化 SparkSQL在使用Scala編寫程式碼的時候,儘量避免低效的、容易GC的程式碼;儘管增加了編寫程式碼的難度,但對於使用者來說介面統一。

2.4 BlinkDB

BlinkDB 是一個用於在海量資料上執行互動式 SQL 查詢的大規模並行查詢引擎,它允許使用者通過權衡資料精度來提升查詢響應時間,其資料的精度被控制在允許的誤差範圍內。為了達到這個目標,BlinkDB 使用兩個核心思想:

l一個自適應優化框架,從原始資料隨著時間的推移建立並維護一組多維樣本;

l一個動態樣本選擇策略,選擇一個適當大小的示例基於查詢的準確性和(或)響應時間需求。

和傳統關係型資料庫不同,BlinkDB是一個很有意思的互動式查詢系統,就像一個蹺蹺板,使用者需要在查詢精度和查詢時間上做一權衡;如果使用者想更快地獲取查詢結果,那麼將犧牲查詢結果的精度;同樣的,使用者如果想獲取更高精度的查詢結果,就需要犧牲查詢響應時間。使用者可以在查詢的時候定義一個失誤邊界。

2.5  MLBase/MLlib

MLBase是Spark生態圈的一部分專注於機器學習,讓機器學習的門檻更低,讓一些可能並不瞭解機器學習的使用者也能方便地使用MLbase。MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。

l  ML Optimizer會選擇它認為最適合的已經在內部實現好了的機器學習演算法和相關引數,來處理使用者輸入的資料,並返回模型或別的幫助分析的結果;

l  MLI 是一個進行特徵抽取和高階ML程式設計抽象的演算法實現的API或平臺;

l MLlib是Spark實現一些常見的機器學習演算法和實用程式,包括分類、迴歸、聚類、協同過濾、降維以及底層優化,該演算法可以進行可擴充; MLRuntime 基於Spark計算框架,將Spark的分散式計算應用到機器學習領域。

總的來說,MLBase的核心是他的優化器,把宣告式的Task轉化成複雜的學習計劃,產出最優的模型和計算結果。與其他機器學習Weka和Mahout不同的是:

l  MLBase是分散式的,Weka是一個單機的系統;

l  MLBase是自動化的,Weka和Mahout都需要使用者具備機器學習技能,來選擇自己想要的演算法和引數來做處理;

l  MLBase提供了不同抽象程度的介面,讓演算法可以擴充

l  MLBase基於Spark這個平臺

2.6 GraphX

GraphX是Spark中用於圖(e.g., Web-Graphs and Social Networks)和圖平行計算(e.g., PageRank and Collaborative Filtering)的API,可以認為是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重寫及優化,跟其他分散式圖計算框架相比,GraphX最大的貢獻是,在Spark之上提供一棧式資料解決方案,可以方便且高效地完成圖計算的一整套流水作業。GraphX最先是伯克利AMPLAB的一個分散式圖計算框架專案,後來整合到Spark中成為一個核心元件。

GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴充套件了Spark RDD的抽象,有Table和Graph兩種檢視,而只需要一份物理儲存。兩種檢視都有自己獨有的操作符,從而獲得了靈活操作和執行效率。如同Spark,GraphX的程式碼非常簡潔。GraphX的核心程式碼只有3千多行,而在此之上實現的Pregel模型,只要短短的20多行。GraphX的程式碼結構整體下圖所示,其中大部分的實現,都是圍繞Partition的優化進行的。這在某種程度上說明了點分割的儲存和相應的計算優化的確是圖計算框架的重點和難點。

GraphX的底層設計有以下幾個關鍵點。

1.對Graph檢視的所有操作,最終都會轉換成其關聯的Table檢視的RDD操作來完成。這樣對一個圖的計算,最終在邏輯上,等價於一系列RDD的轉換過程。因此,Graph最終具備了RDD的3個關鍵特性:Immutable、Distributed和Fault-Tolerant。其中最關鍵的是Immutable(不變性)。邏輯上,所有圖的轉換和操作都產生了一個新圖;物理上,GraphX會有一定程度的不變頂點和邊的複用優化,對使用者透明。

2.兩種檢視底層共用的物理資料,由RDD[Vertex-Partition]和RDD[EdgePartition]這兩個RDD組成。點和邊實際都不是以表Collection[tuple]的形式儲存的,而是由VertexPartition/EdgePartition在內部儲存一個帶索引結構的分片資料塊,以加速不同檢視下的遍歷速度。不變的索引結構在RDD轉換過程中是共用的,降低了計算和儲存開銷。

3.圖的分散式儲存採用點分割模式,而且使用partitionBy方法,由使用者指定不同的劃分策略(PartitionStrategy)。劃分策略會將邊分配到各個EdgePartition,頂點Master分配到各個VertexPartition,EdgePartition也會快取本地邊關聯點的Ghost副本。劃分策略的不同會影響到所需要快取的Ghost副本數量,以及每個EdgePartition分配的邊的均衡程度,需要根據圖的結構特徵選取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut這四種策略。在淘寶大部分場景下,EdgePartition2d效果最好。

2.7 SparkR

SparkR是AMPLab釋出的一個R開發包,使得R擺脫單機執行的命運,可以作為Spark的job執行在叢集上,極大得擴充套件了R的資料處理能力。

SparkR的幾個特性:

l  提供了Spark中彈性分散式資料集(RDD)的API,使用者可以在叢集上通過R shell互動性的執行Spark job。

l  支援序化閉包功能,可以將使用者定義函式中所引用到的變數自動序化傳送到叢集中其他的機器上。

l  SparkR還可以很容易地呼叫R開發包,只需要在叢集上執行操作前用includePackage讀取R開發包就可以了,當然叢集上要安裝R開發包。

2.8  Tachyon

Tachyon是一個高容錯的分散式檔案系統,允許檔案以記憶體的速度在叢集框架中進行可靠的共享,就像Spark和 MapReduce那樣。通過利用資訊繼承,記憶體侵入,Tachyon獲得了高效能。Tachyon工作集檔案快取在記憶體中,並且讓不同的 Jobs/Queries以及框架都能記憶體的速度來訪問快取檔案”。因此,Tachyon可以減少那些需要經常使用的資料集通過訪問磁碟來獲得的次數。Tachyon相容Hadoop,現有的Spark和MR程式不需要任何修改而執行。

在2013年4月,AMPLab共享了其Tachyon 0.2.0 Alpha版本的Tachyon,其宣稱效能為HDFS的300倍,繼而受到了極大的關注。Tachyon的幾個特性如下:

lJAVA-Like File API

Tachyon提供類似JAVA File類的API,

l相容性

Tachyon實現了HDFS介面,所以Spark和MR程式不需要任何修改即可執行。

l可插拔的底層檔案系統

Tachyon是一個可插拔的底層檔案系統,提供容錯功能。tachyon將記憶體資料記錄在底層檔案系統。它有一個通用的介面,使得可以很容易的插入到不同的底層檔案系統。目前支援HDFS,S3,GlusterFS和單節點的本地檔案系統,以後將支援更多的檔案系統。