1. 程式人生 > >Spark(四) -- Spark工作機制

Spark(四) -- Spark工作機制

一、應用執行機制

一個應用的生命週期即,使用者提交自定義的作業之後,Spark框架進行處理的一系列過程。

在這個過程中,不同的時間段裡,應用會被拆分為不同的形態來執行。

1、應用執行過程中的基本元件和形態

Driver:
執行在客戶端或者叢集中,執行Application的main方法並建立SparkContext,調控整個應用的執行。

Application:
使用者自定義並提交的Spark程式。

Job:
一個Application可以包含多個Job,每個Job由Action操作觸發。

Stage:
比Job更小的單位,一個Job會根據RDD之間的依賴關係被劃分為多個Stage,每個Stage中只存有RDD之間的窄依賴,即Transformation運算元。

TaskSet:
每個Stage中包含的一組相同的Task。

Task:
最後被分發到Executor中執行的具體任務,執行Stage中包含的運算元。

明確了一個應用的生命週期中會有哪些元件參與之後,再來看看使用者是怎麼提交Spark程式的。

2、應用的兩種提交方式

Driver程序執行在客戶端(Client模式):

即使用者在客戶端直接執行程式。
程式的提交過程大致會經過以下階段:

  1. 使用者執行程式。
  2. 啟動Driver進行(包括DriverRunner和SchedulerBackend),並向叢集的Master註冊。
  3. Driver在客戶端初始化DAGScheduler等元件。
  4. Woker節點向Master節點註冊並啟動Executor(包括ExecutorRunner和ExecutorBackend)。
  5. ExecutorBackend啟動後,向Driver內部的SchedulerBackend註冊,使得Driver可以找到計算節點。
  6. Driver中的DAGScheduler解析RDD生成Stage等操作。
  7. Driver將Task分配到各個Executor中並行執行。

Driver程序執行在叢集中(某個Worker節點,Cluster模式):

即使用者將Spark程式提交給Master分配執行。
大致會經過一下流程:

  1. 使用者啟動客戶端,提交Spark程式給Master。
  2. Master針對每個應用分發給指定的Worker啟動Driver進行。
  3. Worker收到命令之後啟動Driver程序(即DriverRunner和其中的SchedulerBackend),並向Master註冊。
  4. Master指定其他Worker啟動Executor(即ExecutorRunner和其內部的ExecutorBackend)。
  5. ExecutorBackend向Driver中的SchedulerBackend註冊。
  6. Driver中的DAGScheduler解析RDD生產Stage等。
  7. Executor內部啟動執行緒池並行化執行Task。

可以看到,兩種程式的提交方式在處理過程中,僅僅是在哪個地方啟動Driver程序的區別而已。
為Client模式中時(使用Spark Shell直接執行的程式),Driver就在客戶端上。
為Cluster模式時(提交Spark程式到Master),Driver執行與叢集中的某個Worker節點。

二、排程與任務分配模組

Spark框架就像一個作業系統一樣,有著自己的作業排程策略,當叢集執行在不同的模式下,排程不同級別的單位,使用的策略也是有所不同的。

1、Application之間的排程

當有多個使用者提交多個Spark程式時,Spark是如何排程這些應用併合理地分配資源呢?

Standalone模式下,預設使用FIFO,每個app會獨佔所有資源

可以通過以下幾個引數調整叢集相關的資源:

  • spark.cores.max:調整app可以在整個叢集中申請的CPU core數量
  • spark.deploy.defaultCores:預設的CPU core數量
  • spark.executor.memory:限制每個Executor可用的記憶體

在Mesos模式下,可以使用

  • spark.mesos.coarse=true設定靜態配置資源的策略
  • 使用mesos://URL且不配置spark.mesos.coarse=true(每個app會有獨立固定的記憶體分配,空閒時其他機器可以使用其資源)

在Yarn模式下,提交作業時可以使用

  • 通過–num-executors控制分配多少個Executor給app
  • –executor-memory和–executor-cores分別控制Executor的記憶體和CPU core

2、Application內部的Job排程機制

一個Application中,由各個Action觸發的多個Job之間也是存在排程關係的。

Action操作實現上是呼叫了SparkContext的runJob方法提交Job。

Spark中排程Job有兩種策略

FIFO:

  • 第一個Job分配其所需的所有資源
  • 第二個Job如果還有剩餘資源的話就分配,否則等待

FAIR:

  • 使用輪詢的方式排程Job

可以通過配置spark.scheduler.mode調整Job的排程方式

另外也可以配置排程池,具體參考官方文件
或者參考conf/fairscheduler.xml.template檔案。

3、Job中的Stage排程

Stage是由DAGScheduler元件生產的,在原始碼中,有三個比較特殊的變數:

  • waitingStages:儲存等待執行的Stages
  • runningStages:儲存正在執行的Stages
  • failedStages:儲存執行失敗的Stage

Spark會通過廣度優先遍歷找到最開始的Stage執行,若有父Stage沒有執行完則等待。

4、Stage中的Task排程

暫未了解。。。

三、I/O制度

Spark雖然是基於記憶體計算的框架,但是不可避免的也會接觸到一些儲存層,那麼在和儲存層互動的時候,Spark做了哪些工作?

1、序列化

序列化的本質就是將物件轉換為位元組流,可以理解為將連結串列中儲存的非連續空間的資料儲存轉化為連續空間儲存的陣列中

Spark為什麼要做序列化操作?

記憶體或磁碟中RDD會含有物件的儲存,而在節點間資料的傳輸時,序列化之後的資料可以節約空間和提高效率。

2、壓縮

壓縮是日常生活中的一個常見操作,好處顯而易見,節約空間,從而就可以獲得時間上的效率。

Spark中序列化之後的資料可以進行壓縮以減少空間開銷。
Spark支援兩種壓縮演算法

  • Snappy演算法:高壓縮速度
  • LZF演算法:高壓縮比

在不同的場景中選擇不同的壓縮演算法可以有效的提高程式執行的效率。

壓縮配置方式:

  1. 啟動前在spark-env.sh中設定:export SPARK_JAVA_OPTS=”-Dspark.broadcast.compress”
  2. 在應用程式中配置
    conf.getBoolean(“spark.broadcast.compress,true”)
    conf.set(“spark.broadcast.compress”,true)

3、塊管理

RDD從物理上看是一個元資料結構,記錄著Block和Node之間的對映關係。

儲存RDD是以Block塊為單位的,每個分割槽對應一個塊,PartitionID通過元資料資訊可以對映到Block。

BlockManager管理和介面、塊讀寫流程、資料塊讀寫管理等細節待繼續深入瞭解。

四、通訊模組

Spark中使用Akka作為通訊框架

  • Actors是一組包含狀態和行為的物件
  • 一個Actor接收到其他Actor的資訊之後可以根據需求做出各種反應
  • Client、Master、Worker等都是一個Actor

Spark各個元件的之間協調工作都是基於Akka機制來的,待深入瞭解的有:

  • Client Actor通訊程式碼邏輯
  • Master Actor通訊程式碼邏輯
  • Worker Actor訊息處理邏輯

五、容錯機制

之前講過,RDD之間的運算元操作會形成DAG圖,RDD之間的依賴關係會形成Lineage。

要理解Lineage機制首先要明確兩種依賴的概念:

  • Shuffle Dependencies(寬依賴)
    父分割槽可以被多個子分割槽所用
    即多對多的關係

  • Narrow Dependencies(窄依賴)
    父分割槽最多被一個子分割槽所用
    即一對一或者多對一的關係

當出現某個節點計算錯誤的時候,會順著RDD的操作順序往回走

一旦是Narrow Dependencies錯誤,重新計算父RDD分割槽即可,因為其不依賴其他節點

而如果Shuffle Dependencies錯誤,重算代價較高,因為一旦重新計算其依賴的父RDD分割槽,會造成冗餘計算

這時候就需要人為的新增檢查點來提高容錯機制的執行效率

什麼情況下需要加CheckPoint

  • DAG中的Lineage過長,如果重算開銷太大,故在特定幾個Shuffle Dependencies上做CheckPoint是有價值的。
  • Checkpoint會產生磁碟開銷,因為其就是將資料持久化到磁碟中,所以做檢查點的RDD最好是已經在記憶體中快取了。

六、Shuffle機制

Shuffle的定義:對無規則的資料進行重組排序等過程

為什麼要Shuffle:分散式計算中資料是分佈在各個節點上計算的,而彙總統計等操作需要在所有資料上執行

Spark中Shuffle經歷的階段:

Shuffle Write
    將各個節點資料寫入到指定分割槽
        1、根據下一個Stage分割槽數分成相應的Bucket
        2、將Bucket寫入磁碟
Shuffle Fetch
    獲取各個分割槽傳送的資料
        1、在儲存有Shuffle資料節點的磁碟Fetch需要的資料
        2、Fetch到本地之後進行自定義的聚集函式操作

最後記錄一下提交Spark作業的方法
在spark的bin目錄下
執行spark-submit指令碼
./spark-submit \
–class 入口函式所在的類名全稱 \
–master spark master節點的地址(預設埠7077)\
–executor-memory 指定worker中Executor的記憶體 \
–total-executor-cores 100 \
jar檔案所在的目錄 \

相關推薦

Spark -- Spark工作機制

一、應用執行機制 一個應用的生命週期即,使用者提交自定義的作業之後,Spark框架進行處理的一系列過程。 在這個過程中,不同的時間段裡,應用會被拆分為不同的形態來執行。 1、應用執行過程中的基本元件和形態 Driver: 執行在客戶端或者叢集中,執行A

SparkSpark的廣播變量和累加器

style reac color add each top cast 廣播 ive 一、概述 在spark程序中,當一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量會被復制

SparkSpark 鍵值對操作

目錄: 4、鍵值對操作 4.1、建立PairRDD 4.2、PairRDD的轉化操作 4.2.1、聚合操作 4.2.2、資料分組 4.2.3、連線 4.2.4、資料排序 4.3、PairRDD的行動操作 4.4、資料分割槽 4.4.1、獲取RDD的分割槽方式

Spark學習之路 Spark的廣播變量和累加器

img 還原 變量定義 如果 style 調優 學習之路 park 系統 一、概述 在spark程序中,當一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量會被復制到每臺機器

大資料之Spark--- Dependency依賴,啟動模式,shuffle,RDD持久化,變數傳遞,共享變數,分散式計算PI的值

一、Dependency:依賴:RDD分割槽之間的依存關係 --------------------------------------------------------- 1.NarrowDependency: 子RDD的每個分割槽依賴於父RDD的少量分割槽。 |

Spark (Python版) 零基礎學習筆記—— Spark概覽

結合了《Learning Spark: Lightning-Fast Big Data Analysis》和官方2.02版本的Spark Document總結了關於Spark概念性的一些知識。幫助大家對Spark有一個總體上的認知 一、Spark的兩個核心概念

Spring學習之旅Spring工作原理再探

容器 mxml 實現 span ssp express 16px 部分 做了 上篇博文對Spring的工作原理做了個大概的介紹,想看的同學請出門左轉。今天詳細說幾點。 (一)Spring IoC容器及其實例化與使用 Spring IoC容器負責Bean的實例化、配置和組裝工

Android SurfaceFlinger服務 ----- 消息機制MessageQueue

events on() inpu str lB 取消 onf CA andro SurfaceFlinger有著自己的消息隊列MessageQueue,用來處理顯示相關的消息,比如Vsync消息。 相關文件: frameworks/native/services/surf

SparkSpark任務提交方式和執行流程

sla handles 解析 nod 就會 clust 它的 管理機 nag 一、Spark中的基本概念 (1)Application:表示你的應用程序 (2)Driver:表示main()函數,創建SparkContext。由SparkContext負責與Cluste

Git的學習與使用——Git 工作區、暫存區和版本庫

基本概念 我們先來理解下Git 工作區、暫存區和版本庫概念 工作區:就是你在電腦裡能看到的目錄。 暫存區:英文叫stage, 或index。一般存放在 ".git目錄下" 下的index檔案(.git/index)中,所以我們把暫存區有時也叫作索引(index)。 版本庫:工作區有一個隱

大資料之Spark--- Spark的SQL模組,Spark的JDBC實現,SparkSQL整合MySQL,SparkSQL整合Hive和Beeline

一、Spqrk的SQL模組 ---------------------------------------------------------- 1.該模組能在Spack上執行Sql語句 2.可以處理廣泛的資料來源 3.DataFrame --- RDD --- tabl

大資料之Spark--- Spark核心API,Spark術語,Spark三級排程流程原始碼分析

一、Spark核心API ----------------------------------------------- [SparkContext] 連線到spark叢集,入口點. [HadoopRDD] extends RDD 讀取hadoop

大資料之Spark--- Spark簡介,模組,安裝,使用,一句話實現WorldCount,API,scala程式設計,提交作業到spark叢集,指令碼分析

一、Spark簡介 ---------------------------------------------------------- 1.快如閃電的叢集計算 2.大規模快速通用的計算引擎 3.速度: 比hadoop 100x,磁碟計算快10x 4.使用: java

大資料基礎之Spark1Spark Submit即Spark任務提交過程

Spark版本2.1.1 一 Spark Submit本地解析 1.1 現象 提交命令: spark-submit --master local[10] --driver-memory 30g --class app.package.AppClass app-1

大資料之Spark--- Spark機器學習,樸素貝葉斯,酒水評估和分類案例學習,垃圾郵件過濾學習案例,電商商品推薦,電影推薦學習案例

一、Saprk機器學習介紹 ------------------------------------------------------------------ 1.監督學習 a.有訓練資料集,符合規範的資料 b.根據資料集,產生一個推斷函式

大資料之Spark--- Spark Streaming介紹,DStream,Receiver,Streamin整合Kafka,Windows,容錯的實現

一、Spark Streaming介紹 ----------------------------------------------------------- 1.介紹 是spark core的擴充套件,針對實時資料的實時流處理技術 具有可擴充套件、高吞吐量、

大資料之Spark--- Spark閉包處理,部署模式和叢集模式,SparkOnYarn模式,高可用,Spark整合Hive訪問hbase類載入等異常解決,使用spark下的thriftserv

一、Spark閉包處理 ------------------------------------------------------------ RDD,resilient distributed dataset,彈性(容錯)分散式資料集。 分割槽列表,function,dep Op

springboot學習小筆記---springboot工作機制

Spring Boot工作機制簡介 一、@SpringBootApplication 我們新建一個springboot專案裡面會有一個SpringbootDay1203Application類 我們都知道@SpringBootApplication是Springboot專案的核心註

DDRddr2 工作時序與原理

1.4-bit Prefetch  直接上一個表,看看DDR2的三個頻率的關係,下圖是內部時鐘均為133MHz的DDR2/DDR/SDRAM的比較,由圖可以看到,相比於DDR,DDR2由於是4-bit Prefetch,外部時鐘是內部匯流排時鐘的2倍,而DDR和SDRAM中

大資料之Spark--- Spark閉包處理,Spark的應用的部署模式,Spark叢集的模式,啟動Spark On Yarn模式,Spark的高可用配置

一、Spark閉包處理 ------------------------------------------------------------ RDD,resilient distributed dataset,彈性(容錯)分散式資料集。 分割槽列表,fun