1. 程式人生 > >Spark Streaming中withWatermark的簡單嘗試

Spark Streaming中withWatermark的簡單嘗試

我們在處理流資料的時候,往往會有實時性要求。可是如果我們直接按照程式所在伺服器的當前時間計算又不行,比如當上遊日志資料延遲了,則所有的這部分資料都會被拋棄掉。所以一般我們在記錄日誌的時候,加上日誌的時間戳。這樣我們在進行流處理的時候,就可以把日誌記錄的時間拿出來,根據這個時間來決定流處理是不是要往下進行。而往往我們會以最早到達的日誌作為時間參考點,如果下一個日誌比這個時間點晚的太多,就可以拋棄掉。這樣的目的就是不需要等待延遲太多的日誌以犧牲小部分的資料完整性來保證實時性。而一般來說,在日誌伺服器端,往往如果日誌延遲了就一起延遲,只有極少情況少部分日誌延遲,這樣在處理端大部分情況下資料的存在率還是比較高的。

當我們以日誌的記錄時間來檢測延遲以保證實時性的時候,spark streaming的withWatermark函式則提供了這種功能。我們接著以上一篇介紹的event hub為資料來源,來模擬這個操作。往event hub傳送資料的格式和上一篇完全一樣,類似下面的”EventHub;20180619 15:20:08“格式。

Send data on EventHub;20180619 15:20:08
Send data on EventHub;20180619 15:15:09
Send data on EventHub;20180619 15:07:09
Send data on EventHub;20180619 15:10:09
Send data on EventHub;20180619 15:23:09
Send data on EventHub;20180619 15:30:09
Send data on EventHub;20180619 15:25:10

Send data on EventHub;20180619 15:18:10

然後在spark streaming的程式碼裡面,我們對後面的時間戳解析並用來作為withWatermark的時間。通過輸出模式設定為update來檢視它是怎麼處理每條記錄並保證實時性的。下面程式碼的前面和結尾部分可以參考上一篇,這裡主要是關鍵的處理部分。

  val streamingInputDF =
    spark
      .readStream // DataStreamReader
      .format("eventhubs") // DataStreamReader
      .options(eventHubsConf.toMap) // DataStreamReader
      .load() // DataFrame

  // split lines by whitespaces and explode the array as rows of 'word'
  val df = streamingInputDF.select($"body".cast("string"))
    .withColumn("_tmp", split($"body", ";"))
    .select(
      $"_tmp".getItem(0).as("name"),
      $"_tmp".getItem(1).as("ptime")
    ).drop("_tmp")
    .withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
    .drop("ptime")
    .withWatermark("posttime", "15 minutes")
    .groupBy(
      window($"posttime", "5 minutes", "5 minutes"),
      $"name"
    )
    .count()
    .writeStream
    .outputMode("update")
    .format("console")
    .start()

  df.awaitTermination()

Duration是一分鐘,withWatermark的最大延遲是15分鐘,時間視窗5分鐘,滑動視窗也是5分鐘。我們可以看到輸出結果中,每分鐘(batch)輸出結果。而對於Batch:34,由於這個時間比目前最早到達的時間晚了超過15分鐘,於是就直接被拋棄掉了。

-------------------------------------------
Batch: 33
-------------------------------------------
+--------------------+--------+-----+
|              window|    name|count|
+--------------------+--------+-----+
|[2018-06-19 14:45...|EventHub|    6|
+--------------------+--------+-----+


2018-06-19T07:07:59.458+0000: [GC (Allocation Failure) [PSYoungGen: 1273436K->16986K(1277440K)] 1746956K->490514K(5551104K), 0.0156202 secs] [Times: user=0.04 sys=0.00, real=0.02 secs] 
-------------------------------------------
Batch: 34
-------------------------------------------
+------+----+-----+
|window|name|count|
+------+----+-----+
+------+----+-----+


-------------------------------------------
Batch: 35
-------------------------------------------
+--------------------+--------+-----+
|              window|    name|count|
+--------------------+--------+-----+
|[2018-06-19 14:45...|EventHub|    7|
+--------------------+--------+-----+