1. 程式人生 > >【秒懂StructuredStreaming】手把手教你寫StructuredStreaming + Kafka程式

【秒懂StructuredStreaming】手把手教你寫StructuredStreaming + Kafka程式

這篇部落格我們介紹基於StructuredStreaming進行實時流運算元開發,並將結果輸出到kafka中。

      StructuredStreaming使用的資料型別是DataFrame和Dataset。

     從Spark 2.0開始,DataFrame和Dataset可以表示靜態(有界資料),以及流式(無界資料)。與靜態Dataset/ DataFrame類似,使用者可以使用公共入口點SparkSession 從流源建立流DataFrame /Dataset,並對它們應用與靜態DataFrame / Dataset相同的操作。如果你不熟悉Dataset / DataFrame,請戳

《Dataset常用方法》《DataFrame常用方法》

    接下來,我們以Append輸出模式為例,講解流式DataFrame的建立,基礎操作和視窗操作,以及將結果輸出到外部儲存介質的方法。

1、流式DataFrame建立

    以Kafka作為輸入源為例。

    a、引入兩個依賴包

        

    b、建立SparkSession入口,用於與叢集資源管理器互動

        

    c、指定kafka的地址和埠號,從Kafka的一個或者多個topic訂閱輸入源,建立流失DataFrame

          設定maxOffsetsPerTrigger,控制斷點續傳一個batch最大處理條數,避免一次處理太多工導致記憶體溢位。

                

2、流式DataFrame基本操作

    基本操作指不包含聚合的操作。

    流式DataFrame基本操作支援大部分的靜態的DataFrame操作,如 從無型別,類似SQL的操作(例如select,where,join)到型別化的RDD類操作(例如map,filter,flatMap)都支援。

    

    流式DataFrame部分不支援的操作如下:

        #1)不支援take、distinct操作

        #2)有條件地支援 streaming 和 static Datasets 之間的 Outer joins 。

    • 不支援使用 streaming Dataset 的 Full outer join
    • 不支援在右側使用 streaming Dataset 的 Left outer join
    • 不支援在左側使用 streaming Dataset 的 Right outer join
    • 不支援兩種 streaming Datasets 之間的任何種類的 join
    #3)不支援將DataFrame或者Dataset轉成RDD再進行操作!!!

3、流式DataFrame視窗操作

    Append模式的聚合操作因為需要使用Watermark刪除舊的聚合結果,所以,只支援基於Watermark(用於控制延遲時間)的Event-Time(事件時間)聚合操作,不指定Watermark的聚合操作不支援

    基於Watermark(用於控制延遲時間)的Event-Time(事件時間)聚合操作  一般通過Window時間視窗實現。

    涉及四個概念:

        視窗長度:視窗的持續時間,如10分鐘的資料為一個視窗

        視窗滑動間隔:視窗操作的時間間隔,比如間隔5分鐘,表示每5分鐘生成一個Window

        計算觸發時間:依據指定的trigger batch時長作為觸發時長,Spark去Kafka訂閱這個時長產生的資料,獲取資料的Event-Time,將資料分發到對應的Window中進行計算

        Watermark:wm的值 = 當前batch資料中最大 Event-Time - late threshold 。Event-Time大於wm的延遲資料將被處理 ,但資料小於的資料將被丟棄。

                              當視窗的上界小於wm值,代表視窗中的所有資料的Event-Time都小於wm了,不需要在對這個視窗更新,所以這個視窗在Result Table中的聚合結果將被輸出到外部儲存介質Kafka

    以每隔5分鐘,統計前10分鐘出現的單詞的次數,並將結果寫入Kafka為例,講解程式碼編寫和執行機制:

    #1)程式碼如下:

每隔5分鐘,對10分鐘時間視窗和單詞word進行分組,並統計每個分組的個數。 當資料延遲超過10分鐘到達Spark,延遲資料會被忽略。

                

       a、 withWatermark函式第一個引數是 資料表中的時間戳欄位的欄位名,如圖中的timestamp,第二個引數是延遲的時間閾值,如圖中的10 分鐘 

            注意:withWatermark要緊跟dataFrame,寫在groupBy之前,否則會報錯

        b、window函式第一個引數是時間戳欄位名,需要與withWatermark函式的一個引數名一致,否則會報錯,第二個引數是視窗長度,第三個引數是滑動間隔

    #2)實現機制如下圖:        

       

    a、橫座標是觸發時長,5分鐘觸發一次執行,Spark去Kafka訂閱這個時長產生的資料,獲取資料的Event-Time,將資料分發到對應的Window中進行計算。

         如圖中,12:15~12:20這個batch獲取的資料共4條,其中12:15和12:21是正常到達Spark的,而12:08和12:13這兩條是延遲到達Spark

    b、縱座標是資料的Event-Time

    c、藍色虛線上的點是每次batch的資料中,獲取到的最大的Event-Time

    d、紅色實現程式碼當前計算得到的最大的waterMark

    e、黃色實心的圓圈代表正常抵達的資料;紅色實心的圓圈代表延遲的資料,但是在延遲閾值範圍內,有被處理;紅色空心的圓圈程式碼延遲的資料,但在延遲閾值範圍外,資料被丟棄了。

    如圖中,12:25分觸發時,上一個batch 計算得到的wm=batch最大的event-time 12:21 減去 設定的 延遲閾值 10分鐘  = 12:11分,因為12:00~12:10這個視窗的最大值已經全部小於wm了,所以,該視窗的值從Result Table輸出到Kafka

     同理,12:30分觸發時,上一個batch 計算得到的wm=batch最大的event-time 12:26 減去 設定的 延遲閾值 10分鐘  = 12:16分,因為12:05~12:15這個視窗的最大值已經全部小於wm了,所以,該視窗的值從Result Table輸出到Kafka

4、流式結果輸出到Kafka,並啟動流式計算

    #1)將結果寫入kafka的一個topic,並呼叫start函式啟動實時流計算,300秒觸發一次執行 

    #2)將結果寫入kafka的多個topic,通過在dataframe的topic欄位指明每行具體所屬的topic,並呼叫start函式啟動實時流計算,300秒觸發一次執行 

    注意:一定要呼叫start()函式,實時流計算才會啟動!!!

         

4、實時流作業監控

通過非同步的方式對實時流進行監控,輸出每次batch觸發從Kafka獲取的起始和終止的offset,總條數,以及通過最大Event-Time計算得到的Watermark等

程式碼如下:

每次batch觸發得到的日誌如下: