1. 程式人生 > >Structured-Streaming之窗口操作

Structured-Streaming之窗口操作

abp mvd vps pmu aid googl nic oom eas

Structured Streaming 之窗口事件時間聚合操作

Spark StreamingExactly Once 指的是:

  • 每條數據從輸入源傳遞到 Spark 應用程序 Exactly Once
  • 每條數據只會分到 Exactly Once batch 處理
  • 輸出端文件系統保證冪等關系

Structured Streaming 返回的是 DataFrame/DataSet,我們可以對其應用各種操作 - 從無類型,類似 SQL 的操作(例如 selectwheregroupBy)到類型化的 RDD 類操作(例如 mapfilterflatMap)。

基本操作:選擇,投影,聚合

  1. case class DeviceData(device: String, deviceType: String,
  2. signal: Double, time: DateTime)

  3. val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
  4. val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data

  5. // Select the devices which have signal more than 10
  6. df.select("device").where("signal > 10") // using untyped APIs
  7. ds.filter(_.signal > 10).map(_.device) // using typed APIs

  8. // Running count of the number of updates for each device type
  9. df.groupBy("deviceType").count() // using untyped API

  10. // Running average signal for each device type
  11. import org.apache.spark.sql.expressions.scalalang.typed
  12. ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API

不支持的操作:

但是,不是所有適用於靜態 DataFrames/DataSet 的操作在流式 DataFrames/DataSet 中受支持。從 Spark 2.0 開始,一些不受支持的操作如下:

  • 在流 DataFrame/DataSet 上還不支持多個流聚集(即,流 DF 上的聚合鏈)。
  • 不支持 limittake(N)
  • 不支持 Distinct
  • sort 操作僅在聚合後在完整輸出模式下支持
  • 流和靜態流的外連接支持是有條件的:
    • 不支持帶有流 DataSet 的完全外連接
    • 不支持右側的流的左外連接
    • 不支持左側的流的右外部聯接
  • 不支持兩個流之間的任何 join
  • 此外,還有一些方法不能用於流DataSet,它們是將立即運行查詢並返回結果的操作,這對流DataSet沒有意義。相反,這些功能可以通過顯式地啟動流查詢來完成。
  • count() - 無法從流 DataSet 返回單個計數。
    相反,使用 ds.groupBy.count() 返回包含運行計數的流DataSet
  • foreach() - 使用 ds.writeStream.foreach(...)(參見下一節)。
  • show() - 而是使用控制臺接收器

如果您嘗試任何這些操作,您將看到一個 AnalysisException 如“操作 XYZ 不支持與流 DataFrames/DataSet”。

事件時間上的窗口操作

事件時間是嵌入在數據本身的時間,對於許多應用程序,我們可能希望根據事件時間進行聚合操作,為此,Spark2.x 提供了基於滑動窗口的事件時間集合操作。基於分組的聚合操作和基於窗口的聚合操作是非常相似的,在分組聚合中,依據用戶指定的分組列中的每個唯一值維護聚合值,在基於窗口的聚合的情況下,對於行的事件時間落入的每個窗口維持聚合值。

技術分享
structured-streaming-window

  1. import spark.implicits._

  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

  3. // Group the data by window and word and compute the count of each group
  4. val windowedCounts = words.groupBy(
  5. window($"timestamp", "10 minutes", "5 minutes"),
  6. $"word"
  7. ).count()

該段代碼用於用於統計每10分鐘內,接受到的不同詞的個數,其中window($"timestamp", "10 minutes", "5 minutes")的含義為:假設初始時間 t=12:00,定義時間窗口為10分鐘,每5分鐘窗口滑動一次,也就是每5分鐘對大小為10分鐘的時間窗口進行一次聚合操作,並且聚合操作完成後,窗口向前滑動5分鐘,產生新的窗口,如上圖的一些列窗口 12:00-12:10,12:05-12:15,12:10-12:20。

在這裏每個word包含兩個時間,word產生的時間和流接收到word的時間,這裏的timestamp就是word產生的時間,在很多情況下,word產生後,可能會延遲很久才被流接收,為了處理這種情況,Structured Streaming 引進了Watermarking(時間水印)功能,以保證能正確的對流的聚合結構進行更新

技術分享
structured-streaming-late-data
Watermarking的計算方法Watermarking:

  • In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
  • After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger)

Watermarking表示多長時間以前的數據將不再更新,也就是說每次窗口滑動之前會進行Watermarking的計算,首先統計這次聚合操作返回的最大事件時間,然後減去所然忍受的延遲時間就是Watermarking,當一組數據或新接收的數據事件時間小於Watermarking時,則該數據不會更新,在內存中就不會維護該組數據的狀態

技術分享
mw1

Structured Streaming 支持兩種更新模式:

  1. Update 刪除不再更新的時間窗口,每次觸發聚合操作時,輸出更新的窗口

技術分享
structured-streaming-watermark-update-mode
2. Append 當確定不會更新窗口時,將會輸出該窗口的數據並刪除,保證每個窗口的數據只會輸出一次

技術分享
structured-streaming-watermark-append-mode
3. Complete 不刪除任何數據,在 Result Table 中保留所有數據,每次觸發操作輸出所有窗口數據

Structured-Streaming之窗口操作