Structured-Streaming之窗口操作
Structured Streaming 之窗口事件時間聚合操作
Spark Streaming
中 Exactly Once
指的是:
- 每條數據從輸入源傳遞到
Spark
應用程序Exactly Once
- 每條數據只會分到
Exactly Once
batch
處理 - 輸出端文件系統保證冪等關系
Structured Streaming
返回的是 DataFrame/DataSet
,我們可以對其應用各種操作 - 從無類型,類似 SQL 的操作(例如 select
,where
,groupBy
)到類型化的 RDD 類操作(例如 map
,filter
,flatMap
)。
基本操作:選擇,投影,聚合
- case class DeviceData(device: String, deviceType: String,
- signal: Double, time: DateTime)
-
- val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
- val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
-
- // Select the devices which have signal more than 10
- df.select("device").where("signal > 10") // using untyped APIs
- ds.filter(_.signal > 10).map(_.device) // using typed APIs
-
- // Running count of the number of updates for each device type
- df.groupBy("deviceType").count() // using untyped API
-
- // Running average signal for each device type
- import org.apache.spark.sql.expressions.scalalang.typed
- ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
不支持的操作:
但是,不是所有適用於靜態 DataFrames/DataSet
的操作在流式 DataFrames/DataSet
中受支持。從 Spark 2.0 開始,一些不受支持的操作如下:
- 在流
DataFrame/DataSet
上還不支持多個流聚集(即,流 DF 上的聚合鏈)。 - 不支持
limit
和take(N)
- 不支持
Distinct
sort
操作僅在聚合後在完整輸出模式下支持- 流和靜態流的外連接支持是有條件的:
- 不支持帶有流
DataSet
的完全外連接 - 不支持右側的流的左外連接
- 不支持左側的流的右外部聯接
- 不支持帶有流
- 不支持兩個流之間的任何
join
- 此外,還有一些方法不能用於流
DataSet
,它們是將立即運行查詢並返回結果的操作,這對流DataSet
沒有意義。相反,這些功能可以通過顯式地啟動流查詢來完成。 count()
- 無法從流DataSet
返回單個計數。
相反,使用ds.groupBy.count()
返回包含運行計數的流DataSet
。foreach()
- 使用ds.writeStream.foreach(...)
(參見下一節)。show()
- 而是使用控制臺接收器
如果您嘗試任何這些操作,您將看到一個 AnalysisException
如“操作 XYZ 不支持與流 DataFrames/DataSet
”。
事件時間上的窗口操作
事件時間是嵌入在數據本身的時間,對於許多應用程序,我們可能希望根據事件時間進行聚合操作,為此,Spark2.x 提供了基於滑動窗口的事件時間集合操作。基於分組的聚合操作和基於窗口的聚合操作是非常相似的,在分組聚合中,依據用戶指定的分組列中的每個唯一值維護聚合值,在基於窗口的聚合的情況下,對於行的事件時間落入的每個窗口維持聚合值。
structured-streaming-window
- import spark.implicits._
-
- val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
-
- // Group the data by window and word and compute the count of each group
- val windowedCounts = words.groupBy(
- window($"timestamp", "10 minutes", "5 minutes"),
- $"word"
- ).count()
該段代碼用於用於統計每10分鐘內,接受到的不同詞的個數,其中window($"timestamp", "10 minutes", "5 minutes")的含義為:假設初始時間 t=12:00,定義時間窗口為10分鐘,每5分鐘窗口滑動一次,也就是每5分鐘對大小為10分鐘的時間窗口進行一次聚合操作,並且聚合操作完成後,窗口向前滑動5分鐘,產生新的窗口,如上圖的一些列窗口 12:00-12:10,12:05-12:15,12:10-12:20。
在這裏每個word包含兩個時間,word產生的時間和流接收到word的時間,這裏的timestamp就是word產生的時間,在很多情況下,word產生後,可能會延遲很久才被流接收,為了處理這種情況,Structured Streaming 引進了Watermarking(時間水印)功能,以保證能正確的對流的聚合結構進行更新
structured-streaming-late-data
Watermarking的計算方法Watermarking:
- In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
- After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger)
Watermarking表示多長時間以前的數據將不再更新,也就是說每次窗口滑動之前會進行Watermarking的計算,首先統計這次聚合操作返回的最大事件時間,然後減去所然忍受的延遲時間就是Watermarking,當一組數據或新接收的數據事件時間小於Watermarking時,則該數據不會更新,在內存中就不會維護該組數據的狀態
mw1
Structured Streaming 支持兩種更新模式:
Update
刪除不再更新的時間窗口,每次觸發聚合操作時,輸出更新的窗口
structured-streaming-watermark-update-mode
2.
Append
當確定不會更新窗口時,將會輸出該窗口的數據並刪除,保證每個窗口的數據只會輸出一次
structured-streaming-watermark-append-mode
3.
Complete
不刪除任何數據,在 Result Table 中保留所有數據,每次觸發操作輸出所有窗口數據
Structured-Streaming之窗口操作