1. 程式人生 > >spark 任務執行原理

spark 任務執行原理

調優概述

在開發完Spark作業之後,就該為作業配置合適的資源了。Spark的資源引數,基本都可以在spark-submit命令中作為引數設定。很多Spark初學者,通常不知道該設定哪些必要的引數,以及如何設定這些引數,最後就只能胡亂設定,甚至壓根兒不設定。資源引數設定的不合理,可能會導致沒有充分利用叢集資源,作業執行會極其緩慢;或者設定的資源過大,佇列沒有足夠的資源來提供,進而導致各種異常。總之,無論是哪種情況,都會導致Spark作業的執行效率低下,甚至根本無法執行。因此我們必須對Spark作業的資源使用原理有一個清晰的認識,並知道在Spark作業執行過程中,有哪些資源引數是可以設定的,以及如何設定合適的引數值。

Spark作業基本執行原理

Spark基本執行原理

詳細原理見上圖。我們使用spark-submit提交一個Spark作業之後,這個作業就會啟動一個對應的Driver程序。根據你使用的部署模式(deploy-mode)不同,Driver程序可能在本地啟動,也可能在叢集中某個工作節點上啟動。Driver程序本身會根據我們設定的引數,佔有一定數量的記憶體和CPU core。而Driver程序要做的第一件事情,就是向叢集管理器(可以是Spark Standalone叢集,也可以是其他的資源管理叢集,美團•大眾點評使用的是YARN作為資源管理叢集)申請執行Spark作業需要使用的資源,這裡的資源指的就是Executor程序。YARN叢集管理器會根據我們為Spark作業設定的資源引數,在各個工作節點上,啟動一定數量的Executor程序,每個Executor程序都佔有一定數量的記憶體和CPU core。

在申請到了作業執行所需的資源之後,Driver程序就會開始排程和執行我們編寫的作業程式碼了。Driver程序會將我們編寫的Spark作業程式碼分拆為多個stage,每個stage執行一部分程式碼片段,併為每個stage建立一批task,然後將這些task分配到各個Executor程序中執行。task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個程式碼片段),只是每個task處理的資料不同而已。一個stage的所有task都執行完畢之後,會在各個節點本地的磁碟檔案中寫入計算中間結果,然後Driver就會排程執行下一個stage。下一個stage的task的輸入資料就是上一個stage輸出的中間結果。如此迴圈往復,直到將我們自己編寫的程式碼邏輯全部執行完,並且計算完所有的資料,得到我們想要的結果為止。

Spark是根據shuffle類運算元來進行stage的劃分。如果我們的程式碼中執行了某個shuffle類運算元(比如reduceByKey、join等),那麼就會在該運算元處,劃分出一個stage界限來。可以大致理解為,shuffle運算元執行之前的程式碼會被劃分為一個stage,shuffle運算元執行以及之後的程式碼會被劃分為下一個stage。因此一個stage剛開始執行的時候,它的每個task可能都會從上一個stage的task所在的節點,去通過網路傳輸拉取需要自己處理的所有key,然後對拉取到的所有相同的key使用我們自己編寫的運算元函式執行聚合操作(比如reduceByKey()運算元接收的函式)。這個過程就是shuffle。

當我們在程式碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級別的不同,每個task計算出來的資料也會儲存到Executor程序的記憶體或者所在節點的磁碟檔案中。

因此Executor的記憶體主要分為三塊:第一塊是讓task執行我們自己編寫的程式碼時使用,預設是佔Executor總記憶體的20%;第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出後,進行聚合等操作時使用,預設也是佔Executor總記憶體的20%;第三塊是讓RDD持久化時使用,預設佔Executor總記憶體的60%。

task的執行速度是跟每個Executor程序的CPU core數量有直接關係的。一個CPU core同一時間只能執行一個執行緒。而每個Executor程序上分配到的多個task,都是以每個task一條執行緒的方式,多執行緒併發執行的。如果CPU core數量比較充足,而且分配到的task數量比較合理,那麼通常來說,可以比較快速和高效地執行完這些task執行緒。

以上就是Spark作業的基本執行原理的說明,大家可以結合上圖來理解。理解作業基本原理,是我們進行資源引數調優的基本前提。