Spark-1.6.0之Application執行資訊記錄器JobProgressListener
JobProgressListener類是Spark的ListenerBus中一個很重要的監聽器,可以用於記錄Spark任務的Job和Stage等資訊,比如在Spark UI頁面上Job和Stage執行狀況以及執行進度的顯示等資料,就是從JobProgressListener中獲得的。另外,SparkStatusTracker也會從JobProgressListener中獲取Spark執行資訊。外部應用也可以通過Spark提供的status相關API比如AllJobResource, AllStagesResource, OneJobResource, OneStageResource獲取到Spark程式的執行資訊。
JobProgressListener類的繼承關係,以及該類中重要的屬性和方法,見下圖
在Spark-1.6.0中,JobProgressListener物件生成後,會被add到一個LiveListenerBus型別的ListenerBus中。LiveListenerBus類的基礎關係,以及該類中重要方法和屬性見下圖
文章接下來分析在一個Spark Application中JobProgressListener的生命週期,以及其資料接收和傳遞的過程。
一、JobProgressListener生成和資料寫入
1、JobProgressListener生成
在原始碼中,JobProgressListener在SparkContext物件建立時就生成了,
private[spark] val listenerBus = new LiveListenerBus //listenerBus
private var _jobProgressListener: JobProgressListener = _ //定義
...
_jobProgressListener = new JobProgressListener(_conf) //生成
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener //使用
listenerBus.addListener(jobProgressListener) //使用
從上面的程式碼中看到,jobProgressListener在生成後,spark將其存入了LiveListenerBus物件中,其他任何接收到listenerBus的地方都能從中獲取到這個jobProgressListener物件。另外在建立SparkUI物件時,使用到了_jobProgressListener
物件,使得Spark UI頁面能夠從該物件中獲取Spark應用程式的執行時資料。或者也可以像SparkStatusTracker物件那樣,直接從SparkContext物件中獲取jobProgressListener。
最後,在SparkContext中呼叫setupAndStartListenerBus()
方法,啟動和初始化listenerBus。我們可以看到,在該方法中最後呼叫了listenerBus.start(this)
方法真正啟動listenerBus。
2、JobProgressListener接收事件
(1)事件進入LiveListenerBus
LiveListenerBus繼承自AsynchronousListenerBus,可以看到這裡是多執行緒的方式。裡面維持了一個大小為10000的eventQueue,LinkedBlockingDeque型別。這個可以和DAGScheduler中提到的EventLoop類中的eventQueue對比分析。
eventQueue接收事件呼叫的是post方法,這裡呼叫的是LinkedBlockingDeque.offer
方法,而EventLoop中呼叫的是LinkedBlockingDeque.put
,可以比較這兩者的區別。
def post(event: E) {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event) // 向eventQueue提交event
if (eventAdded) {
eventLock.release() // 如果提交成功則釋放鎖
} else {
onDropEvent(event) // 否則丟棄該事件
}
}
所以說,各類事件都是呼叫AsynchronousListenerBus.post
方法傳入eventQueue中的。比如,在DAGScheduler類中,可以看到總共有14個呼叫的地方,下面列舉出其中12個不同的。
DAGScheduler方法 | SparkListenerEvent事件 | 描述 |
---|---|---|
executorHeartbeatReceived | SparkListenerExecutorMetricsUpdate | executor向master傳送心跳錶示BlockManager仍然存活 |
handleBeginEvent | SparkListenerTaskStart | task開始執行事件 |
cleanUpAfterSchedulerStop | SparkListenerJobEnd | Job結束事件 |
handleGetTaskResult | SparkListenerTaskGettingResult | task獲取結果事件 |
handleJobSubmitted | SparkListenerJobStart | Job開始事件 |
handleMapStageSubmitted | SparkListenerJobStart | Job開始事件 |
submitMissingTasks | SparkListenerStageSubmitted | Stage提交事件 |
handleTaskCompletion | SparkListenerTaskEnd | Task結束事件 |
handleTaskCompletion | SparkListenerJobEnd | Job結束事件 |
markStageAsFinished | SparkListenerStageCompleted | Stage結束事件 |
failJobAndIndependentStages | SparkListenerJobEnd | Job結束事件 |
markMapStageJobAsFinished | SparkListenerJobEnd | Job結束事件 |
分析到這裡,各種SparkListenerEvent事件傳遞到了eventQueue中,那麼如何進一步傳遞到JobProgessListener中呢?接下來JobProgressListener作為消費者,從eventQueue中消費這些SparkListenerEvent。
(2)事件進入到JobProgressListener
從SparkContext中啟動LiveListenerBus執行緒開始,LiveListenerBus繼承自AsynchronousListenerBus的run方法便一直在多執行緒執行。在run中有一段主要邏輯,
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
從eventQueue取出事件後,呼叫LiveListenerBus的postToAll方法,將事件分發到各Listener中。
具體看一下LiveListenerBus的postToAll方法,這個方法從ListenerBus繼承。
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// 維持一個Array來儲存add到該bus中的所有listener
private[spark] val listeners = new CopyOnWriteArrayList[L]
/**
* 呼叫addListener方法會把傳入的listener物件存入listeners中
*/
final def addListener(listener: L) {
listeners.add(listener)
}
/**
* spark通過呼叫這個方法,spark的各種事件都會觸發listenerBus中所有listener對該事件作出響應
*/
final def postToAll(event: E): Unit = {
val iter = listeners.iterator
while (iter.hasNext) {
val listener = iter.next()
try {
/**
* onPostEvent方法在SparkListenerBus類中具體實現,針對不同的事件採取不同的方法
* 比如stageSubmitted, stageCompleted, jobStart, jobEnd, taskStart,
* applicationStart, blockManagerAdded,executorAdded等事件
* 分別呼叫SparkListener中不同方法進行處理
*/
onPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}
}
2、JobProgressListener對各種事件的響應
那麼接下來,從JobProgressListener對各種事件的響應方法出發,對其狀態變更邏輯作一個簡要梳理,很多方法從其命名上就能看出其主要功能,有需要的可以進入具體方法中做進一步的研究。JobProgressListener能做出響應的所有SparkListenerEvent事件,基本上都列在前面的表格中了。各類事件基本上都是從DAGScheduler中傳入的,可以參考Spark Scheduler模組原始碼分析之DAGScheduler
(1)Job級別資訊
這裡主要涉及到Job開始和結束的兩個方法
- onJobStart(SparkListenerJobStart)
在Job開始時,獲取job的一些基本資訊,比如引數spark.jobGroup.id
確定的JobGroup。然後生成一個JobUIData物件,用於在Spark UI頁面上顯示Job的ID,提交時間,執行狀態,這個Job包含的Stage個數,完成、跳過、失敗的Stage個數。以及總的Task個數,以及完成、失敗、跳過、正在執行的Task個數。該Job中包含的所有Stage都存入pendingStages中。 - onJobEnd(SparkListenerJobEnd)
在Job完成時,根據該Job的最終狀態是成功還是失敗,分別把該job的相關資訊存入completedJob物件和failedJobs物件中,同時把成功或者失敗的job數加一。然後迴圈處理該Job的每一個Stage,將該Stage對應的當前Job移除,如果移除後發現該Stage再沒有其他Job使用了,就把該Stage從activeStage列表中移除。接下來,如果這個Stage的提交時間為空,則表示該Stage被跳過執行,更新一下skipped的Stage個數,以及skipped的Task個數。(成功和失敗的Stage的邏輯在下面一小節中)
(2)Stage級別資訊
有關Stage的狀態變更處理邏輯,這裡也有Stage的submit和complete方法
- onStageSubmitted(SparkListenerStageSubmitted)
在Stage提交後,將該Stage存入activeStages中,並且從pendingStages中移除該Stage。首先獲得當前的排程池名稱,如果是FIFO模式,則是default(實際上不起任何作用),然後根據該排程池,將這個Stage放入排程池中。然後把所屬job的numActiveStages加一, onStageCompleted(SparkListenerStageCompleted)
在Stage完成後,從排程池中將該Stage移除,同時也從activeStages中移除。根據該Stage是成功還是失敗,繼續更新completedStages或failedStages,並更新這類Stage的統計數。然後把對應Job中activeStages值減一,如果這個Stage是成功的(判斷依據是failureReason為空),則把對應job的成功Stage數加一,否則把對應Job的失敗Stage數加一。
(3)Task級別資訊
有關Task的方法有task開始,結束兩個方法onTaskStart(SparkListenerTaskStart)
當一個Task開始執行時,會把對應Stage中active狀態的Task計數加一,並且把這個Task相關的資訊記入對應Stage中,同時更新該Task所屬Job中Active狀態Task的個數。- onTaskEnd(SparkListenerTaskEnd)
當一個Task執行完成時,獲取該Task對應Stage的executorSummary資訊,這個executorSummary中記錄了每個Executor對應的ExecutorSummary資訊,其中包括task開始時間,失敗task個數,成功task個數,輸入輸出位元組數,shuffle read/write位元組數等。然後根據這個Task所屬的executorId,找到當前Task的執行統計資訊execSummary。如果這個Task執行成功,就將成功task個數加一,否則就將失敗task個數加一。然後根據Task執行狀態,更新對應Stage中失敗或成功Task個數。進一步,更新對應Job中失敗或成功的Task個數。
二、SparkUI頁面從JobProgressListener讀取資料
JobProgressListener主要用在向Spark UI頁面傳遞資料上。