1. 程式人生 > >Spark學習之Spark核心

Spark學習之Spark核心

一、Spark中的一些專業術語

任務:

  • Application:使用者寫的應用程式,包括Driver Program和Executor Program。
  • Job:一個action類運算元觸發執行的操作。
  • stage:一組任務(task)就是一個stage。
  • task:(thread)在叢集中執行時最小的執行單元。

資源、叢集:

  • Master:資源管理的主節點。
  • Worker:資源管理的從節點。
  • Executor:執行任務的程序。
  • ThreadPool:執行緒池,存在於Executor程序中。

二、RDD的寬窄依賴關係

1、窄依賴

父RDD與子RDD,partition之間的關係是一對一那麼父子RDD的依賴關係就稱之為窄依賴這種依賴關係不存在shuffle過程。

窄依賴

2、寬依賴

父RDD與子RDD,partition之間的關係是一對多那麼父子RDD的依賴關係就稱之為寬依賴這種依賴關係存在shuffle過程。
寬依賴
預設情況下,groupByKey返回的RDD分割槽數與父RDD是一致的,如果在使用groupByKey的時候,傳入一個int型別的值,此時返回的RDD分割槽數就是這個int值。
總結:

  • 父RDD不知道有幾個子RDD,但子RDD知道他的父RDD有幾個。基於此特點,形成一個DAG有向無環圖需要
    從後往前回溯。
  • 寬窄依賴的作用就是為了將一個個的job切割成一個個的stage

三、Stage切割規則

Stage切割規則
上圖是將job切割成Stage的過程。
總結:

  • 切割後的結果是stage與stage之間是寬依賴,stage之間是窄依賴。
  • 將job切割成stage的目的?stage與stage之間有shuffle,stage內部無shuffle。
  • RDD中實際上儲存的是計算邏輯,而不是真實的資料。

Stage計算模式:

Stage計算模式
task0這條線所貫穿所有的的partition中的計算邏輯,並且以遞迴函式展開式的形式整合在一起,fun2(fun1(textFile(b1))),最好傳送到b1以及他副本所在的節點。


task1:fun2(fun1(textFile(b1)))最好傳送到b2以及他副本所在的節點。
task的計算模式是pipeline的計算模式管道計算
總結:
MapReduce的計算模式是1+1=2,2+1=3,會有資料落地,Spark的計算模式是1+1+1=3,不會有資料落地的情況。

四、任務排程

Spark是一個分散式平行計算框架。我們寫的Application要在叢集中分散式計算,由於大資料中的計算原則是計算找資料,為了將每一個task精準的分發到節點上,此時需要任務排程器找到資料的位置,從而分發task到節點上。
任務排程

任務排程過程:

首先根據程式碼生成DAG有向無環圖,然後將有向無環圖交給DAGScheduler:

  • 步驟一:根據RDD的寬窄依賴關係,將DAG切割成一個個的stage,將切割出來的stage封裝到TaskSet物件中,然後將一個個的TaskSet給TaskScheduler;
  • 步驟二:TaskScheduler拿到TaskSet以後,會遍歷這個結果,拿到每一個task,然後去呼叫HDFS上的某一個方法,獲取資料的位置,根據資料的位置來分發task到Woker節點的Executor程序中的執行緒池中執行;
  • 步驟三:TaskScheduler會實時跟蹤每一個task的執行情況,若執行失敗,TaskScheduler會重試提交task,不會無休止的重試,預設是重試3次,如果重試3次依舊失敗,那麼這個task所在stage就失敗了,此時TS向DAGScheduler彙報;
  • 步驟四:TaskScheduler向DAGScheduler彙報當前stage失敗,此時DAGScheduler會重試提交stage。注意:每一次重試提交的stage,已經成功執行的不會被再次分發到Executor程序執行,只是提交重試失敗的。
  • 如果DAGScheduler重試了4次依然失敗,那麼stage所在的job就失敗了,job失敗是不會進行重試的。DAGScheduler重試次數spark.stage.maxConsecutiveAttempts可設定。

TaskScheduler:retry failed or straggling tasks。當有掙扎(掉隊)任務時,也會重試。
掙扎(掉隊)任務:
比如10000個task中,有9999個執行完成,只有一個task正在執行,那麼這個任務就叫掙扎任務。
TaskScheduler遇到掙扎任務也會重試,此時TaskScheduler會重新提交一個和掙扎task一模一樣的task到叢集中執行但是掙扎task不會被kill會讓他倆在叢集中比賽執行,誰先執行完畢,就以誰的結果為準。

推測執行機制:用來判斷哪些task是掙扎task。

推測執行機制的標準:當所有的task75%以上全部執行完畢,那麼TasredkScheduler才會每隔100ms計算一下哪一些task需要推測執行。
例如:100個task中有76個task已執行完畢,24個task沒有執行完畢,此時他會計算這24個task已經執行時間的中位數,然後將中位數*1.5得到最終時間,拿到這個最終計算出來的時間,去檢視哪一些task超時,此時這些task就是掙扎task。

配置資訊的使用:

  1. 在程式碼中SparkConf
  2. 在提交Application的時候通過–conf來設定(spark-submit
    –master --conf k=v),如果要修改多個配置資訊的值,那麼需要加多個–conf比如說stage、task的重試次數都要修改,此時需要加上兩個–conf,分別來設定。(常用)
  3. 在spark的配置檔案中配置,spark-default.conf