1. 程式人生 > >大資料晉級之路(5)Hadoop,Spark,Storm綜合比較

大資料晉級之路(5)Hadoop,Spark,Storm綜合比較

大資料框架:Spark vs Hadoop vs Storm

目錄

 

大資料時代,TB級甚至PB級資料已經超過單機尺度的資料處理,分散式處理系統應運而生。

知識預熱

關於大資料的四大特徵(4V

  • 海量的資料規模(Volume):Quantifiable(可量化)
  • 高速的資料流轉和動態的資料體系(Velocity):Measurable(可衡量)
  • 多樣的資料型別(Variety):Comparable(可對比)
  • 巨大的資料價值(Value):Evaluable(可評估)

關於大資料應用場景:

  • 資料探勘
  • 智慧推薦
  • 大資料風控

推薦目前三大應用最廣泛、國人認知最多的Apache開源大資料框架系統:Hadoop,Spark和Storm。

Storm - 主要用於實時大資料分析,Spark - 主要用於“實時”(準實時)大資料分析,Hadoop - 主要用於離線大資料分析。

本文以 Hadoop 和 Spark 為主,Storm 僅作簡單介紹。

歷史發展小知識

2003年到2004年間,Google發表 MapReduce、GFS(Google File System)和 BigTable 三篇技術論文,提出一套全新的分散式計算理論,成為大資料時代的技術核心。

江湖傳說永流傳:谷歌技術有"三寶"

MapReduce:分散式計算框架,==> Hadoop MapReduce,平行計算的程式設計模型

GFS:分散式檔案系統,==> HDFS,為上層提供高效的非結構化資料儲存服務(一個master(元資料伺服器),多個chunkserver(資料伺服器))

BigTable:基於 GFS 的資料儲存系統,==> HBase,提供結構化資料服務的分散式資料庫(鍵值對映,稀疏、分散式、持久化、多維排序對映)

To Bottom

Hadoop

Hadoop是一個生態系統(分散式儲存-運算叢集),開發和執行處理大規模資料或超大資料集(Large Data Set)的軟體平臺,是Apache的一個用Java語言實現的開源軟體框架,實現在大量計算機叢集中對海量資料進行分散式計算。

關於官網對 Hadoop 的介紹:

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

The Apache™ Hadoop® project develops open-source software for reliable, scalable, distributed computing.

  • Hadoop Common: The common utilities that support the other Hadoop modules.
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.
  • Hadoop YARN: A framework for job scheduling and cluster resource management.
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

Hadoop框架中最核心設計:(排序是Hadoop的靈魂)

  • HDFS:(底層資料層),海量資料儲存,磁碟儲存;
  • MapReduce:(上層運算層)資料批量計算;

HDFS

Hadoop Distributed File System,Hadoop分散式檔案儲存和管理系統,是資料管理和儲存功能的一種輔助工具。每個檔案被分成固定大小的塊(預設64MB),塊作為最小的儲存單位放到眾多伺服器上(按鍵值對將塊儲存在HDFS上,並將鍵值對的對映存在記憶體中),檔案的每個塊都有備份(預設3份)。

關於副本存放策略

HDFS的存放策略是將一個副本存放在本地機架節點上,另外兩個副本放在不同機架的不同節點上。

  • 每個DN最多儲存一個副本
  • 每個機架最多儲存兩個副本

關於容錯機制

Hadoop Master採用 Log + CheckPoint 技術實現故障恢復,同時採用 Secondary Master 輔助之:

  • Log:記錄元資料的每一次變化,相當於連續資料保護
  • CheckPoint:冗餘資料備份,相當於一次全量備份

Master宕機後,先恢復到checkpoint,然後根據log恢復到最新狀態。每次建立一個新的checkpoint,log即可清空,有效控制log檔案大小。

關於 Moving computation is cheaper than moving data

  • 邏輯分發,而不是資料分發;
  • 計算邏輯分發到資料側,在資料側分散式處理,而不是集中式處理;

優點

  • 主從 Master-Slaver 模式,元資料和資料分離,負載均衡
  • Cheap and Deep:適合部署在普通低廉的(low-cost)機器硬體上 and 水平擴充套件
  • Scale "out",not "up":向"外"橫向擴充套件,而非向"上"縱向擴充套件
  • 高度容錯處理、高吞吐量的資料訪問
  • 流式資料訪問,一次寫入、多次讀取(Write Once Read Many,WORM)
  • 為應用開發者隱藏系統層細節:Focus on what to compute,neglect how to compute

侷限性

  • 抽象層次低,API 支援少;
  • 重吞吐量,輕時延:互動式資料處理和實時資料處理較弱;
  • 迭代式資料處理效能比較差;

HDFS通訊有 Client 和 NameNode + DataNode 兩部分。NameNode 獲取元資料,定位到具體的 DataNode,DataNode 讀取相應的檔案資料。Client和NameNode 以及 NameNode和DataNode 基於TCP/IP通訊,遠端過程呼叫(RPC)模型封裝 ClientProtocol協議 和 DatanodeProtocol協議,Client和NameNode通過ClientProtocol協議互動,NameNode和DataNode通過DatanodeProtocol協議互動。

Master中的Task queue,儲存待執行的任務,每一個Slaver有若干Task slots,用來接收Master分配來的任務並執行。

  

  • Client:與NN互動獲取檔案元資料;與DN互動進行資料讀寫
  • NameNode:Master Node,管理節點(元資料節點),管理資料塊對映(目錄和檔案與Block的對應關係、Block與DataNode的對應關係);處理客戶端的讀寫請求;配置副本策略;管理HDFS名稱空間(維護檔案系統的名字空間和檔案屬性);所有元資料都儲存在記憶體中, 記憶體中儲存的是 = fsimage + edits;儲存檔案系統執行的狀態資訊
  • DataNode:Slaver Node,資料節點,儲存Client傳送的資料塊;執行資料塊的讀寫操作;執行副本策略;容錯機制
  • fsimage:元資料映象檔案(檔案系統的目錄樹)
  • fsedits:元資料操作日誌檔案(針對檔案系統所做的修改操作記錄)
  • JobTracker:in NameNode 中,當有任務提交到Hadoop叢集時,負責Job的執行和多個TaskTrackers的排程
  • TaskTracker:in DataNode 中,負責某一個Map或者Reduce任務

其中,fsimage和fsedits儲存在硬碟上,對映關係不儲存在硬碟上、而是在系統啟動的時候從資料節點收集而成的。Secondary NameNode是NameNode的冷備份,分擔NameNode的工作量(預設每隔1小時,從NameNode獲取fsimage和edits來進行合併,然後再發送給NameNode)。關於冷備份和熱備份,扼要說明:

  • 冷備份:b 是 a 的冷備份,如果 a 壞掉,b 不能馬上代替 a 工作。但 b 上儲存會 a 的一些資訊,減少 a 壞掉之後的損失
  • 熱備份:b 是 a 的熱備份,如果 a 壞掉,b 馬上執行代替 a 的工作

注意,NameNode節點只有1個,難以支援高效儲存大量小檔案。作為HDFS的神經中樞,存在單點故障(SPOF),可能導致資料丟失。

採用 HA(High Available)機制 冗餘備份解決:

  • Secondary NameNode:元資料備份方案
  • AvatarNode:能夠使HDFS以最短時間完成故障切換

亦可以通過ZooKeeper實現主從結構避免單點故障。

HDFS檔案讀寫流程:

執行讀或寫過程,支援Staging(分段傳輸),NameNode與DataNode通過 HeartBeat(TaskTracker週期給JobTracker傳送心跳,把TaskTracker的執行狀態和map任務的執行情況傳送給JobTracker)保持通訊。

     

(1)檔案讀取

  • Client向NameNode發起讀檔案請求
  • NameNode把該檔案的DataNode資訊返回給Client
  • Client從DataNode總讀取資訊

(2)檔案寫入

  • Client向NameNode發起寫檔案請求
  • NameNode根據檔案大小和檔案塊配置情況,把它管理的DataNode資訊返回給Client
  • Client將檔案劃分為多個檔案塊,並根據DataNode的地址資訊,按順序把Block按順序寫入到DataNode中

一個檔案經過建立、寫入和關閉之後就不需要也不能再改變,解決資料一致性問題。

具體流程圖參見:HDFS 工作原理;進一步的詳細瞭解,請參見:HDFS 初探 - 讀寫資料流

推薦參考:【漫畫解讀】HDFS儲存原理

MapReduce

第一代計算引擎,Hadoop分散式計算的關鍵技術,Job Scheduling/Executing System,簡單程式設計模型(大規模資料集的平行計算)、磁碟讀寫、暴力但笨重。

核心思想:分而治之 ---> "拆分 + 合併",但是拆分要均勻(Shuffle)

資料處理流程中的每一步都需要一個Map階段和一個Reduce階段,即一個Job只有Map和Reduce兩個階段,每個階段都是用鍵值對(key/value)作為輸入和輸出

Map:對映,對集合裡的每個目標應用同一個操作,Mapper

Reduce:化整為零、大事化小,遍歷集合中的元素來返回一個綜合的結果,Reducer

關於網上用最簡短的語言解釋 MapReduce:

We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes. Now we get together and add our individual counts. That’s reduce.

再通俗點,可以理解為,把一堆雜亂無章的資料按照某種特徵歸納,然後處理並得到最後結果。Map階段面對的是雜亂無章的互不相關的資料,它解析每個資料,從中提取key和value,也就是提取資料的特徵。經過MapReduce的Shuffle階段後,Reduce階段看到的都是已經歸納好的資料,在此基礎上可以做進一步的處理以便得到結果。

首先了解下 InputSplit 的基本概念:

  • 分片,概念來源於檔案,一個檔案可以切分成多個片段
  • Hadoop定義的用來傳送給每個單獨map的資料,InputSplit儲存的並非資料本身,而是一個分片長度和一個記錄資料位置的陣列
  • Map task 的最小輸入單位
  • 一個分片不會跨越兩個檔案,一個空的檔案佔用一個分片
  • 分片不一定等長,一個分片可以跨一個大檔案中連續的多個Block,通常分片大小就是BlockSize

關於MapReduce的大概處理流程:任務的分解與結果的彙總

其中,Map過程需要繼承org.apache.hadoop.mapreduce包中的Mapper類並重寫map方法,Reduce過程需要繼承org.apache.hadoop.mapreduce包中的Reducer類並重寫reduce方法。map函式接受一個<key,value>形式的輸入,產生一個<key,value>形式的中間輸出,Hadoop負責將所有具有相同結果中間key值的value集合到一起傳給reduce函式,reduce函式接受一個如<key,(list of value)>形式的輸入,然後對這個value集合進行處理,每個reduce產生0或1個輸出,reduce的輸出也是<key,value>形式。

關於MapReduce的詳細處理流程

參考:MapReduce原理與設計思想; MapReduce框架詳解詳解Hadoop核心架構

Hadoop 排程機制

Hadoop叢集中,伺服器按用途分為 Master 節點和 Worker 節點:

  • Master:任務拆分和任務分配,含有 JobTracker(安排MapReduce運算層任務)和 NameNode(管理HDFS資料層儲存)程式
  • Worker:任務執行,含有 TaskTracker(接受JobTracker排程,執行MapReduce運算層任務)和 DataNode(執行資料讀寫操作、執行副本策略)程式

在MapReduce運算層上,Master伺服器負責分配運算任務,JobTracker程式將Map和Reduce程式的執行工作指派給Worker伺服器上的TaskTracker程式,由TaskTracker負責執行Map和Reduce工作,並將運算結果返回給JobTracker。

注意,Master節點也可以有TaskTracker和DataNode程式,即Master伺服器可以在本地端扮演Worker角色。此外,map任務的分配考慮資料本地化(Data Local),reduce任務的分配並不考慮。

MapReduce執行流程

整個過程,具體參考:Hadoop-分散式計算平臺初探。Map/Reduce框架和分散式檔案系統執行在一組相同的節點上,Master節點負責任務的排程和監控、重新執行已失敗的任務,Worker節點負責任務的執行。輸入資料來自底層分散式檔案系統,中間資料放在本地檔案系統,最終輸出資料寫入底層分散式檔案系統。注意 Map/Reduce作業 和 map/reduce函式 的區別:

  • Map作業處理一個輸入檔案資料的分片,可能需要呼叫多次map函式來處理每個輸入的鍵值對,一個Map作業對應一個檔案分片;
  • Reduce作業處理一個分割槽的中間鍵值對,需要對每個不同的鍵呼叫一次reduce函式,一個Reduce作業最終對應一個輸出檔案;

map函式:接受一個鍵值對(key-value pair),產生一組中間鍵值對

各個map函式對所劃分的資料並行處理,從不同的輸入資料產生不同的中間結果輸出。

1

2

3

4

map(String key, String value): 

    // key: document name, value: document contents 

    for each word w in value: 

        EmitIntermediate(w, "1");

reduce函式:接受一個鍵以及相關的一組值,將這組值進行合併產生一組規模更小的值(通常只有一個或零個值)

各個reduce函式各自平行計算,各自負責處理不同的中間結果資料集合。在reduce處理前,必須等所有的map函式完成,因此在進入reduce前需要有一個同步障(barrier)負責map節點執行的同步控制,這個階段也負責對map的中間結果資料進行收集整理(aggregation & shuffle)處理,以便reduce更有效地計算最終結果。最終彙總所有reduce的輸出結果即可獲得最終結果。

1

2

3

4

5

6

reduce(String key, Iterator values): 

    // key: a word, values: a list of counts 

    int result = 0; 

    for each v in values: 

        result += ParseInt(v); 

        Emit(AsString(result));

在map處理完成、進入reduce處理之前,中間結果資料會經過 Partitioner(劃分)和 Combiner(合併)的處理:

  • Partitioner:一個reducer節點所處理的資料可能來自多個map節點,因此map節點輸出的中間結果需使用一定的策略進行劃分處理,保證相關資料傳送到同一個reducer節點,可以理解為GroupByKey
  • Combiner:為減少資料通訊開銷,中間結果資料進入reduce節點前需要進行合併處理,把具有同樣主鍵的資料合併到一起,避免重複傳送

關於Partitioner,利用了負載均衡的思想,對進入到Reduce的鍵值對根據key值計算hash再對Reduce個數進行求餘進行分組到Reduce。在MapReduce中,預設的partitioner是HashPartitioner類,通過方法 getPartition()獲取分割槽值。若要實現自定義的分割槽函式,重寫getPartition()方法即可。對Partitioner的深入理解,有興趣可以參見:Hadoop中Partition深度解析

關於Combiner,號稱本地的Reduce,Reduce最終的輸入是Combiner的輸出。

一個問題,Partitioner和Combiner執行順序問題,理論上 Partitioner ---> Combiner,不過 Combiner ---> Partitioner 效能要更優。

此外,可以再結合官方給出的示意圖,理解 Map - Reduce 過程:

關於Shuffle過程

通常map task和reduce task在不同的DataNode上執行,主要的開銷:網路傳輸和磁碟IO

Shuffle過程是MapReduce的核心,負責資料從map task輸出到reduce task輸入,把map task的輸出結果有效地傳送到reduce端。

  • 完整地從map端拉取資料到reduce端
  • 跨節點拉取資料時,儘可能地減少對頻寬的不必要消耗
  • 減少磁碟IO對task執行的影響

Shuffle過程橫跨map端和reduce端,分為兩個階段:Map端的shuffle階段(廣義Shuffle) 和 Reduce端的Shuffle階段

  • Map端:包括map階段、Spill過程(輸出、sort、溢寫、merge)
  • Reduce端:包括copy、sort、merge過程、reduce階段

1)Shuffle - map端

每個map task都有一個環形記憶體緩衝區(kvbuffer,預設100MB)(環形,有效利用記憶體空間),作用是批量收集map結果,減少磁碟IO讀寫的影響,每個map task的執行結果key/value對和Partition的結果都會被寫入緩衝區(可以簡單理解為以三元組<partition, key, value>的形式儲存)。

Hadoop:分散式計算平臺初探

對於環形緩衝區和Partitioner操作,涉及原始檔 MapTask.java 的內部類 MapOutputBuffer,該類主要用於:(1)緩衝map輸出資料;(2)資料區域性排序;

環形緩衝區儲存兩種資料:

  • K/V資料:kv,map task的輸出鍵值對,儲存方向是向上增長
  • 索引資料:kvmeta,鍵值對在環形緩衝區的索引,儲存方向是向下增長,每個meta資訊 = <value_stIdx、key_stIdx、partition值、value_len>

資料區域和索引區域在緩衝區是相鄰但不重疊的兩個部分,以equator為分界點,初始 equator=0,每執行一次spill過程,更新equator。

在MapOutputBuffer中meta的儲存資訊如下:

若有興趣深入理解MapOutputBuffer,具體參見:Map輸出資料的處理類MapOutputBuffer分析; MapOutputBuffer理解的三重境界

亦可參見:騰訊大資料之TDW計算引擎解析—Shuffle,針對 Shuffle 過程作了詳細解釋,包括 k-v-p 資訊的儲存問題。

當緩衝區快滿(80%)時需要將緩衝區資料以一個臨時檔案的方式存放到磁碟(spill to disk),當整個map task執行結束後再對磁碟中由這個map task產生的所有臨時檔案合併,生成最終的正式輸出檔案(分割槽且排序),然後等待reduce task來拉資料。注意,只要設定了combiner,在map端會執行兩次combiner:

  • 第一次是在 spill 階段,該過程在記憶體中執行,針對這80M的記憶體緩衝區執行sort和combiner,partitioner在寫入記憶體緩衝區之前已經執行
  • 第二次是在 merge 階段,該過程在disk中進行,針對disk中的多個溢寫檔案執行combiner合併成一個檔案

在map階段執行sort(在spill階段對key排序,對相同key的value排序)和combiner(對相同key的value合併)操作的必要性:

  • 儘量減少每次寫入磁碟的資料量
  • 儘量減少在複製階段網路傳輸的資料量

注意,為了減少資料通量,此處也可以執行資料壓縮操作。在Java中,對輸出資料壓縮設定:

1

2

3

4

5

6

// map端輸出壓縮

conf.SetBoolean("mapred.compress.map.output"true)

// reduce端輸出壓縮

conf.SetBoolean("mapred.output.compress"true)

// reduce端輸出壓縮使用的類

conf.SetClass("mapred.output.compression.codec", GzipCodex.class, CompressionCodec.class)

關於spill過程,執行者是SortAndSpill,包括輸出、排序、溢寫、合併階段。

  • 輸出:collect,map task結果輸出到環形緩衝區中,collect()方法會呼叫 getPartition() 方法
  • 排序:sort,把kvbuffer中資料按partition和key兩個關鍵字排序,移動的只是索引資料,結果是kvmeta中的資料按partition為單位分割槽聚集,同一partition內按key有序
  • 溢寫:spill,溢寫內容輸出到檔案,分割槽在檔案中的位置用三元組<stIdx、原始資料長度、壓縮之後的資料長度>的形式索引
  • 合併:merge(combine),合併該map task輸出的所有溢寫檔案,一個map task最終對應一箇中間輸出檔案

有興趣可參考:Map階段分析之spill過程

2)Shuffle - reduce端

在reduce task執行之前,reduce端的工作就是不斷地拉取當前job裡每個map task的最終結果,然後對從不同地方拉取過來的資料不斷地做merge(實質是歸併排序),最終形成一個檔案作為reduce task的輸入檔案。關於reducer程序的啟動,當正在執行+已完成的map task達到一定比例後由JobTracker分配執行reduce task。注意,只要設定了combiner,在reduce端會執行兩次combiner:

  • 第一次是在記憶體緩衝區到disk的 merge 階段(記憶體-->磁碟):當記憶體中的資料量到達一定閾值,啟動記憶體到磁碟的merge,將記憶體資料溢寫到disk中
  • 第二次是在disk中的 merge 階段(磁碟-->磁碟):將disk中的多個溢寫檔案執行combiner合併成一個檔案

注意,在記憶體緩衝區中並不執行merge操作(記憶體-->記憶體)。最後一次合併的結果並沒有寫入磁碟,而是直接輸入到reduce函式。每一個reducer對應一個輸出檔案到HDFS,多個reducer的輸出檔案不執行合併操作,每個輸出檔案以Reducer number為標識。

對於Shuffle過程的深入理解參見:MapReduce - Shuffle,文中結合上圖對Shuffle過程拆分,對具體地細節作出了詳細解釋,文末的評論也不錯,有值得借鑑的地方。其他關於MapReduce的資訊參見:講座總結|解讀大資料世界中MapReduce的前世今生; MapReduce的一點理解

參考

To Bottom

Spark

Spark是一個生態系統,核心由Scala語言開發,為批處理(Spark Core)、互動式(Spark SQL)、流式處理(Spark Streaming)、機器學習(MLlib)、圖計算(GraphX)提供了一個更快、更通用的統一的資料處理平臺(One Stack rule them all),是類Hadoop MapReduce的通用並行框架。

  • Spark Core:基本引擎,提供記憶體計算框架、提供Cache機制支援資料共享和迭代計算,用於大規模並行和分散式資料處理
    • 採用執行緒池模型減少Task啟動開稍
    • 採用容錯的、高可伸縮性的Akka作為通訊框架
  • Spark SQL:支援SQL或者Hive查詢語言來查詢資料

Spark 被標榜為:"快如閃電的叢集計算"

  • 開源分散式計算系統
  • 基於記憶體處理的大資料平行計算框架
  • 資料處理的實時性,高容錯性,高可伸縮性,負載均衡
  • 統一的程式設計模型:高效支援整合批量處理和互動式流分析

Spark 生態系統名稱:伯克利資料分析棧(BDAS

關於官網對 Spark 的介紹:

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Apache Spark™ is a fast and general engine for large-scale data processing. 

  • Speed:Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
  • Ease of Use:Write applications quickly in Java, Scala, Python, R.
  • Generality:Combine SQL, streaming, and complex analytics.
  • Runs Everywhere:Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.

Spark 最核心設計:

  • RDD:海量資料儲存,記憶體或磁碟儲存;

Spark 專用名詞預熱:

  • Application:Spark 應用程式,包含一個 Driver 程式和分佈在叢集中多個節點上執行的若干 Executor 程式碼
  • Operation:作用於 RDD 的各種操作分為 Transformation 和 Action
  • Job:作業,SparkContext 提交的具體 Action 操作,一個 Job 包含多個 RDD 及作用於相應 RDD 上的各種 Operation,常與Action對應
  • Stage:每個 Job 會被拆分很多組任務,每組任務被稱為 Stage,也稱TaskSet,即一個作業包含多個階段
  • Partition:資料分割槽,一個 RDD 中的資料可以分成多個不同的區
  • DAG:Directed Acycle graph, 有向無環圖,反映 RDD 之間的依賴關係
  • Caching Managenment:快取管理,對 RDD 的中間計算結果進行快取管理以加快整體的處理速度

Driver in Application ---> Job(RDDs with Operations) ---> Stage ---> Task

RDD 相關術語:

  • batch interval:時間片或微批間隔,一個時間片的資料由 Spark Engine 封裝成一個RDD例項
  • batch data:批資料,將實時流資料以時間片為單位分批
  • window length:視窗長度,必須是 batch interval 的整數倍
  • window slide interval:視窗滑動間隔,必須是 batch interval 的整數倍

關於 Spark 處理速度為什麼比 (Hadoop)MapReduce 快?

  • MapReduce 中間結果在 HDFS 上,Spark 中間結果在記憶體,迭代運算效率高
  • MapReduce 排序耗時,Spark 可以避免不必要的排序開銷
  • Spark 能夠將要執行的一系列操作做成一張有向無環圖(DAG),然後進行優化

此外,Spark 效能優勢

  • 採用事件驅動的類庫 AKKA 啟動任務,通過執行緒池來避免啟動任務的開銷
  • 通用性更好,支援 map、reduce、filter、join 等運算元

AKKA,分散式應用框架,JAVA虛擬機器JVM平臺上構建高併發、分散式和容錯應用的工具包和執行時,由 Scala 編寫的庫,提供 Scala和JAVA 的開發介面。

  • 併發處理方法基於Actor模型
  • 唯一通訊機制是訊息傳遞

RDD

Resilient Distributed Dataset,彈性分散式資料集,RDD 是基於記憶體的、只讀的、分割槽儲存的可重算的元素集合,支援粗粒度轉換(即:在大量記錄上執行相同的單個操作)。RDD.class 是 Spark 進行資料分發和計算的基礎抽象類,RDD 是 Spark 中的抽象資料結構型別,任何資料在 Spark 中都被表示為 RDD。

RDD是一等公民。Spark最核心的模組和類Spark中的一切都是基於RDD的。

RDD 來源

  • 並行化驅動程式中已存在的記憶體集合 或 引用一個外部儲存系統已存在的資料集
  • 通過轉換操作來自於其他 RDD

此外,可以使 Spark 持久化一個 RDD 到記憶體中,使其在並行操作中被有效的重用,RDDs 也可以自動從節點故障中恢復(基於 Lineage 血緣繼承關係)。

基於 RDD 的操作型別

  • Transformation(轉換):具體指RDD中元素的對映和轉換(RDD-to-RDD),常用操作有map、filter等
  • Action(動作):提交Spark作業,啟動計算操作,併產生最終結果(向用戶程式返回或者寫入檔案系統)

轉換是延遲執行的,通過轉換生成一個新的RDD時候並不會立即執行(只記錄Lineage,不會載入資料),只有等到 Action 時,才觸發操作(根據Lineage完成所有的轉換)。

操作型別區別:返回結果為RDD的API是轉換,返回結果不為RDD的API是動作。

常用運算元清單

關於相關運算元的初識:Spark RDD API 詳解

依賴關係:窄依賴,父RDD的每個分割槽都只被子RDD的一個分割槽所使用;寬依賴,父RDD的分割槽被多個子RDD的分割槽所依賴。

  • 窄依賴可以在某個計算節點上直接通過計算父RDD的某塊資料得到子RDD對應的某塊資料;
  • 資料丟失時,窄依賴只需要重新計算丟失的那一塊資料來恢復;

SparkConf

執行配置,一組 K-V 屬性對。SparkConf 用於指定 Application 名稱、master URL、任務相關引數、調優配置等。構建 SparkContext 時可以傳入 Spark 相關配置,即以 SparkConf 為參例項化 SparkContext 物件。

SparkContext

執行上下文。Spark 叢集的執行單位是 Application,提交的任何任務都會產生一個 Application,一個Application只會關聯上一個Spark上下文。SparkContext 是 Spark 程式所有功能的唯一入口,類似 main() 函式。

關於共享變數

Spark 提供兩種型別的共享變數(Shared varialbes),提升叢集環境中的 Spark 程式執行效率。

  • 廣播變數:Broadcast Variables,Spark 向 Slave Nodes 進行廣播,節點上的 RDD 操作可以快速訪問 Broadcast Variables 值,而每臺機器節點上快取只讀變數而不需要為各個任務傳送該變數的拷貝;
  • 累加變數:Accumulators,只有在使用相關操作時才會新增累加器(支援一個只能做加法的變數,如計數器和求和),可以很好地支援並行;

Spark Streaming

構建在 Spark 上的流資料處理框架元件,基於微批量的方式計算和處理實時的流資料,高效、高吞吐量、可容錯、可擴充套件。

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams,

which makes it easy to build scalable fault-tolerant streaming applications.

  • Ease of use:Build applications through high-level operators.
  • Fault Tolerance:Stateful exactly-once semantics out of the box.
  • Spark Integration:Combine streaming with batch and interactive queries.

基本原理是將輸入資料流以時間片(秒級)為單位進行拆分成 micro batches,將 Spark 批處理程式設計模型應用到流用例中,然後以類似批處理的方式處理時間片資料。

圖中的 Spark Engine 批處理引擎是 Spark Core。

Spark Streaming 提供一個高層次的抽象叫做離散流(Discretized Stream,DStream),代表持續的資料流(即一系列持續的RDDs)。DStream 中的每個 RDD 都是按一小段時間(Interval)分割開來的資料集,對 DStream 的任何操作都會轉化成對底層 RDDs 的操作(將 Spark Streaming 中對 DStream 的操作變為針對 Spark 中 RDD 的操作)。

1

2

3

4

5

6

7

sc.foreachRDD { rdd =>

    rdd.foreachPartition { partition =>

        partition.foreach ( record =>

            send(record)

        )

  }

}

Spark 的 StreamingContext 設定完畢後,啟動執行:

1

2

sc.start()  // 啟動計算

sc.awaitTermination()  // 等待計算完成

具體參考:Spark Streaming初探

此外,Spark Streaming 還支援視窗操作,具體地:

實際應用場景中,企業常用於從Kafka中接收資料做實時統計。

Spark SQL

Spark SQL 的前身是 Shark(Hive on Spark)。

結構化資料處理和查詢、提供互動式分析,以 DataFrame(原名 SchemaRDD)形式。DataFrame 是一種以RDD為基礎的分散式資料集,是帶有 schema 元資訊的RDD,即 DataFrame 所表示的二維表資料集的每一列都帶有名稱和型別。

Spark 容錯機制

分散式資料集的容錯性通過兩種方式實現:設定資料檢查點(Checkpoint Data) 和 記錄資料的更新(Logging the Updates)。

Spark容錯機制通過 Lineage(主) - CheckPoint(輔) 實現

  • Lineage:粗粒度的記錄更新操作
  • Checkpoint:通過冗餘資料快取資料

RDD會維護建立RDDs的一系列轉換記錄的相關資訊,即:Lineage(RDD的血緣關係),這是Spark高效容錯機制的基礎,用於恢復出錯或丟失的分割槽。

RDD 之於 分割槽,檔案 之於 檔案塊

若依賴關係鏈 Lineage 過長時,使用 Checkpoint 檢查點機制,切斷血緣關係、將資料持久化,避免容錯成本過高。

Spark 排程機制

Spark 應用提交後經過一系列的轉換,最後成為 Task 在每個節點上執行。相關概念理解:

  • Client:客戶端(Driver端)程序,負責提交作業到Master。
  • Master:主控節點,負責接收Client提交的作業,管理Worker,並命令Worker啟動分配Driver的資源和啟動Executor的資源
  • Worker:叢集中任何可以執行Application程式碼的節點,也可以看作是Slaver節點上的守護程序,負責管理本節點的資源,定期向Master彙報心跳,接收Master的命令,啟動Driver和Executor,是Master和Executor之間的橋樑
  • Driver:使用者側邏輯處理,執行main()函式並建立SparkContext(準備Spark應用程式執行環境、負責與ClusterManager通訊進行資源申請、任務分配和監控
  • Executor:Slaver節點上的後臺執行程序,即真正執行作業的地方,並將將資料儲存到記憶體或者磁碟。一個叢集一般包含多個Executor,每個Executor接收Driver的命令Launch Task,一個Executor可以執行一到多個Task(每個Executor擁有一定數量的"slots",可以執行指派給它的Task)
  • Task:執行在Executor上的工作單元,每個Task佔用父Executor的一個slot (core)
  • Cluster Manager:在叢集上獲取資源的外部服務,目前
    • Standalone:Spark原生的資源管理,由 Master 負責資源分配
    • Hadoop Yarn:由Yarn中的 ResourceManager 負責資源分配

Spark執行的基本流程如下圖:

 

一個Spark作業執行時包括一個Driver程序,也是作業的主程序,負責作業的解析、生成Stage並排程Task到Executor上。包括:

  • DAGScheduler:實現將Spark作業分解成一到多個Stage,每個Stage根據RDD的Partition個數決定Task的個數,然後生成相應的TaskSet放到TaskScheduler中
  • TaskScheduler:維護所有的TaskSet,實現Task分配到Executor上執行並維護Task的執行狀態

每一個 Spark 應用程式,都是由一個驅動程式組成,執行使用者的 Main 函式,並且在一個叢集上執行各種各樣的並行操作:

所有的 Spark 應用程式都離不開 SparkContext 和 Executor 兩部分,Executor 負責具體執行任務,執行 Executor 的機器稱為 Worker 節點,SparkContext 由使用者程式啟動,通過資源排程模組和 Executor 通訊。SparkContext 和 Executor 這兩部分的核心程式碼實現在各種執行模式中都是公用的,在它們之上,根據執行部署模式的不同,包裝了不同調度模組以及相關的適配程式碼。具體來說,以 SparkContext 為程式執行的總入口,在 SparkContext 的初始化過程中,Spark 會分別建立 DAGScheduler(作業排程)和 TaskScheduler(任務排程)兩個排程模組。其中,作業排程模組是基於任務階段的高層排程模組,它為每個 Spark 作業計算具有依賴關係的多個排程階段 (通常根據 Shuffle 來劃分),然後為每個階段構建出一組具體的任務 (通常會考慮資料的本地性等),然後以 TaskSets(任務組) 的形式提交給任務排程模組來具體執行。而任務排程模組則負責具體啟動任務、監控和彙報任務執行情況。具體地:

關於 Spark 的執行架構和機制,參見:http://www.cnblogs.com/shishanyuan/p/4721326.html

Spark 環境搭建

注意,Spark和Scala的版本相容問題,Spark 1.x.x 匹配 Scala 2.10.x 及以下,Spark 2.x.x 匹配 Scala 2.11.x 及以上。官網解釋如下:

Starting version 2.0, Spark is built with Scala 2.11 by default. Scala 2.10 users should download the Spark source package and build with Scala 2.10 support.

推薦使用 Spark 2。若本機安裝的是 Scala 2.10,需要 Building for Scala 2.10

參考

 


完美的大資料場景:讓Hadoop和Spark在同一個團隊裡面協同執行。

  • Hadoop偏重資料儲存 (檔案管理系統,HDFS離線資料儲存),但有自己的資料處理工具MapReduce。
  • Spark偏重資料處理,但需依賴分散式檔案系統整合運作。

雖然Hadoop提供了MapReduce的資料處理功能,但是Spark的基於Map Reduce演算法實現的分散式計算(記憶體版的MapReduce)的資料處理速度秒殺MapReduce,通用性更好、迭代運算效率更高、容錯能力更強。我們應該將Spark看作是Hadoop MapReduce的一個替代品而不是Hadoop的替代品,其意圖並非是替代Hadoop,而是為了提供一個管理不同的大資料用例和需求的全面且統一的解決方案。

To Bottom

Storm

Storm是一個開源的分散式實時計算系統,最流行的流計算平臺。

關於官網對 Storm 介紹:

Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use! 

  • fast:a benchmark clocked it at over a million tuples processed per second per node.
  • scalablefault-tolerant, guarantees your data will be processed, and is easy to set up and operate.
  • realtime analytics, online machine learning, continuous computation, distributed RPC, ETL

參考

其他的相關概念

HBase:面向列、可伸縮的高可靠性、高效能分散式儲存系統,構建大規模結構化資料叢集

Hive:由 Facebook 主導的基於 Hadoop 的大資料倉庫工具,可以將結構化的資料檔案對映為一張資料庫表,並提供完整的sql查詢功能,可以將sql語句轉換為MapReduce任務進行執行

Zookeeper:由 Google 主導的開源分散式應用程式協調服務

Mesos:分散式環境資源管理平臺

Tez:由 Hortonworks 主導的優化 MapReduce 執行引擎,效能更高

Yarn:元件排程系統

BlinkD:在海量資料上執行互動式 SQL 查詢的大規模並行查詢引擎

Kafka:實時、容錯、可擴充套件的分散式釋出-訂閱訊息系統,用於實時移動資料,詳情參見:Kafka - sqh

本文轉自:http://www.cnblogs.com/wjcx-sqh/p/6036832.html