1. 程式人生 > >新的視覺化幫助更好地瞭解Spark Streaming應用程式

新的視覺化幫助更好地瞭解Spark Streaming應用程式

Spark UI中的Streaming標籤頁來顯示以下資訊:

  • 時間軸檢視和事件率統計,排程延遲統計以及以往的批處理時間統計
  • 每個批次中所有JOB的詳細資訊

此外,為了理解在Streaming操作上下文中job的執行情況,有向無環執行圖的視覺化( execution DAG visualization )增加了Streaming的資訊。

讓我們通過一個從頭到尾分析Streaming應用程式的例子詳細看一下上面這些新的功能。

處理趨勢的時間軸和直方圖

當我們除錯一個Spark Streaming應用程式的時候,我們更希望看到資料正在以什麼樣的速率被接收以及每個批次的處理時間是多少。Streaming標籤頁中新的UI能夠讓你很容易的看到目前的值和之前1000個批次的趨勢情況。當你在執行一個Streaming應用程式的時候,如果你去訪問Spark UI中的Streaming標籤頁,你將會看到類似下面圖一的一些東西(紅色的字母,例如[A],是我們的註釋,並不是UI的一部分)。

 

圖1:Spark UI中的Streaming標籤頁

第一行(標記為 [A])展示了Streaming應用程式當前的狀態;在這個例子中,應用已經以1秒的批處理間隔運行了將近40分鐘;在它下面是輸入速率(Input rate)的時間軸(標記為 [B]),顯示了Streaming應用從它所有的源頭以大約49 events每秒的速度接收資料。在這個例子中,時間軸顯示了在中間位置(標記為[C])平均速率有明顯的下降,在時間軸快結束的地方應用又恢復了。如果你想得到更多詳細的資訊,你可以點選 Input Rate旁邊(靠近[B])的下拉列表來顯示每個源頭各自的時間軸,正如下面圖2所示:

 

圖2

圖2顯示了這個應用有兩個來源,(SocketReceiver-0和 SocketReceiver-1)其中的一個導致了整個接收速率的下降,因為它在接收資料的過程中停止了一段時間。

這一頁再向下(在圖1中標記為 [D] ),處理時間(Processing Time的時間軸顯示,這些批次大約在平均20毫秒內被處理完成,和批處理間隔(在本例中是1s)相比花費的處理時間更少,意味著排程延遲(被定義為:一個批次等待之前批次處理完成的時間,被標記為 [E])幾乎是零,因為這些批次在建立的時候就已經被處理了。排程延遲是你的Streaming引用程式是否穩定的關鍵所在,UI的新功能使得對它的監控更加容易。

批次細節

再次參照圖1,你可能很好奇,為什麼向右的一些批次花費更長的時間才能完成(注意圖1中的[F])。你可以通過UI輕鬆的分析原因。首先,你可以點選時間軸檢視中批處理時間比較長的點,這將會在頁面下方產生一個關於完成批次的詳細資訊列表。

 

圖3

它將顯示這個批次的所有主要資訊(在上圖3中以綠色高亮顯示)。正如你所看到的,這個批次較之其他批次有更長的處理時間。另一個很明顯的問題是:到底是哪個spark job引起了這個批次的處理時間過長。你可以通過點選Batch Time(第一列中的藍色連結),這將帶你看到對應批次的詳細資訊,向你展示輸出操作和它們的spark job,正如圖4所示。

 

圖4

圖4顯示有一個輸出操作,它產生了3個spark job。你可以點選job ID連結繼續深入到stages和tasks做更深入的分析。

Streaming RDDs的有向無環執行圖

一旦你開始分析批處理job產生的stages和tasks,更加深入的理解執行圖將非常有用。正如之前的博文所說,Spark1.4.0加入了有向無環執行圖(execution DAG )的視覺化(DAG即有向無環圖),它顯示了RDD的依賴關係鏈以及如何處理RDD和一系列相關的stages。如果在一個Streaming應用程式中,這些RDD是通過DStreams產生的,那麼視覺化將展示額外的Streaming語義。讓我們從一個簡單的Streaming字數統計(word count)程式開始,我們將統計每個批次接收的字數。程式示例NetworkWordCount 。它使用DStream操作flatMap, map和 reduceByKey 來計算字數。任一個批次中一個Spark job的有向無環執行圖將會是如下圖5所示。

 

圖5

視覺化展示中的黑點代表著在批處理時16:06:50由DStream產生的RDD。藍色陰影的正方形是指用來轉換RDD的DStream操作,粉色的方框代表這些轉換操作執行的階段。總之圖5顯示瞭如下資訊:

  • 資料是在批處理時間16:06:50通過一個socket文字流( socket text stream )接收的。
  • Job用了兩個stage和flatMap , map , reduceByKey 轉換操作來計算資料中的字數

儘管這是一個簡單的圖表,它可以通過增加更多的輸入流和類似window操作和updateStateByKey操作等高階的DStream轉換而變得更加複雜。例如,如果我們通過一個含三個批次的移動視窗來計算字數(即使用reduceByKeyAndWindow),它的資料來自兩個socket文字流,那麼,一個批處理job的有向無環執行圖將會像如下圖6所示。

 

圖6

圖6顯示了於一個跨3個批次統計字數的Spark job的許多相關資訊:

  • 前三個stage實際上是各自統計視窗中3個批次的字數。這有點像上面例子 NetworkWordCount 的第一個stage,使用的是map和flatmap操作。不過要注意以下不同點:
  • 這裡有兩個輸入RDD,分別來自兩個socket文字流,這兩個RDD通過union結合成一個RDD,然後進一步轉換,產生每個批次的中間統計結果。
  • 其中的兩個stage都變灰了,因為兩個較舊批次的中間結果已經快取在記憶體中,因此不需要再次計算,只有最近的批次需要從頭開始計算。
  • 最後一個右邊的stage使用reduceByKeyAndWindow 來聯合每個批次的統計字數最終形成一個“視窗”的字數。

這些視覺化使得開發人員不僅能夠監控Streaming應用程式的狀態和趨勢,而且能夠理解它們與底層spark job和執行計劃的關係。