Streaming System 第二章:The What- Where- When- and How of Data Processing
本文由 《Streaming System》一書第二章的提煉翻譯而來, 譯者才疏學淺,如有錯誤,歡迎指正。 轉載請註明出處,侵權必究。
本章主要介紹魯棒的處理亂序資料的核心概念,這些概念的運用使流處理系統超越批處理系統的關鍵所在。
路線圖
上一章中,我們介紹了兩個非常關鍵的概念:
- 事件時間和處理時間,只有在事件時間維度對資料進行處理,才能保證計算結果的準確性
- 視窗:視窗是處理無界資料流的通用方法,目前共有4類視窗。
接下來我們介紹其他三個同樣非常重要的概念: - 觸發器(Triggers)
觸發器是決定某個視窗何時輸出的一種機制。作用跟照相機的快門相同,按下去,就能拿到某個時間點計算結果的快照。通過觸發器,也能多次看到某個視窗的輸出結果。因此可以實現遲到資料(late event)的處理。 - Watermark(水印)
Watermark是描述事件時間上資料完整性的概念。時間點X上的Watermark表示了所有時間點X前的輸入資料都到齊了。本節會粗淺的介紹一下watermark,第三章中會對watermark做深入解釋。 - Accumulation(累積)
累積模式表示了同一個視窗不同輸出間的關係。這些結果間可能完全無關,比如該視窗不同時間的增量,也可能有重疊。不同的累積模式在語義和成本上都不同,要根據具體場景來選擇合適的累積方式。
接下來,我丟擲4個在無界資料處理過程中,最為關鍵的問題:
- 計算什麼結果(__What__ results are calculated?)?這是使用者在程式碼(SQL/pipline code)中定義的,比如求和,算直方圖或訓練機器學習模型等。這也是批處理解決的經典問題。
- 在事件時間的哪個地方計算結果(__Where__ in event time are results calculated)?這是使用者在程式碼中定義的基於事件時間的視窗中定義的。可是使用上一章中介紹的滾窗/劃窗/會話等視窗,也可以使用跟視窗無關的運算元,或者更復雜的視窗,比如限時拍賣。
- 在什麼處理時間點,可以輸出結果(__When__ in processing time are results materialized)?觸發器和watermark會解決這個問題。這個主題有很多個變種,但是最常見的是重複更新場景(比如,物化檢視語義),其使用watermark來指示視窗的輸入資料已經完整,看到watermark後,這個窗口才唯一輸出一次資料。
- 如何更新結果(__How__ do refinements of results relate)?三種方式可以解決這個問題:discarding,accumulating和accumulating and retracting。下文會對這三種模式做更詳細介紹。
批處理的基礎:What&Where
咱們先來看一下批處理中如何解決What和Where兩個問題。
What: Transformations(變換)
批處理中,用變換(Transformations)解決 “Whatresults are calculated?”這個問題。
接下來用一個例項來說明。假設我們要算一次電子遊戲比賽中,某一隊的總得分。這個例子的特點:對輸入資料,在主鍵上,進行求和計算。具體資料如下:
各列資料含義:
- Score:隊中每個隊員得分
- EventTime:隊員得分時間
- ProcTime:資料進入系統進行計算的時間
對資料以EventTime和ProcessTime作圖,如下所示:
我們用Beam虛擬碼來實現這個示例,如果你之前用過Flinkl或Spark,那麼程式碼理解起來應該相對簡單。首先介紹一下Beam的基本知識,Beam中有兩類基本操作:
- PCollections:可以被併發處理的資料集
- PTransforms:對資料集進行的操作。比如group/aggregate等,讀如PCollection併產生新的PCollection。
PCollection<String> raw = IO.read(...);//讀入原始資料 //將原始資料解析成格式劃資料,其中Team為String型別,是主鍵。score是整型。 PCollection<KV<Team, Integer>> scores = input.apply(Sum.integersPerKey()); // 在每個主鍵上,對score做求和操作
我們通過一個時序圖來看看以上程式碼是如何處理這些資料的:
圖中,X軸是EventTime,Y軸是Processing Time,黑色的線表示隨著時間推移對所有資料進行計算,前三幅圖白色的數字(12,30,48)為該processing time時間點上,計算的中間結果,在批處理中,這些中間結果會被儲存下來。最後一幅圖是指整個計算完整個資料集之後,輸出最終結果48。這就是整個經典批處理的處理過程。由於資料是有界的,因此在process time上處理完所有資料後,就能得到正確結果。但是如果資料集是無界資料的話,這樣處理就有問題。接下來我們討論"Where in event time are results calculated?"這個問題。
Where: Windowing
上一章我們討論了3中常用的視窗:固定視窗(又稱為滾動視窗),滑動視窗和會話視窗。視窗將無界資料來源沿著臨時邊界,切分成一個個有界資料塊。
以下是用在Beam中,程式碼中用視窗如何實現之前整數求和的例子:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES))) .apply(Sum.integersPerKey());
理論上批資料是流資料的子集,因此Beam在模型層面對批流做了統一。我們通過時序圖看一下在傳統批處理引擎中,以上程式碼是如何執行的:
從時序圖上可以看出,在事件時間上,以2分鐘為步長,將資料切分到不同的視窗。然後每個視窗的輸出進行累加就得到最終結果。
以上我們回顧了時間域(事件時間和處理時間的關係)和視窗的相關知識,接下來看一下觸發器,watermark和accumulation這三個概念。
Going Streaming: When & How
批處理系統要等到所有資料都到齊才能輸出計算結果,在無界資料流計算中是不可行的。因此流計算系統中引入了觸發器(triggers)和watermark的概念。
When: The wonderful thing about triggers, is triggers are wonderful things!
觸發器解決了‘When in processing time are resultsmaterialized?’這個問題。觸發器會根據事件時間上的watermark來決定,在處理時間的哪個時間點來輸出視窗資料。每個視窗的輸出稱為視窗的窗格(pane of the window)。
有兩種通用的最基礎的trigger型別:
- 重複更新觸發器(Repeated update triggers),定期觸發視窗輸出。比如每條資料都輸出一個結果,或者在processing time上每隔一分鐘輸出一個結果。
- 完整性觸發器(Completeness triggers),僅當視窗的資料完整時,才輸出視窗結果。跟傳統批處理非常類似。只不過視窗的大小不同,傳統批處理是要等整個資料集的資料都到齊,才進行計算。
重複更新觸發器是最常用的觸發器,因為其易於理解和使用,並且跟資料庫中的物化檢視語義非常相似。流計算中,完整性觸發器的語義跟傳統批處理更相似,能夠處理late event。Watermark是驅動Completeness Triggers被觸發的原語。接下來我們會重點介紹watermark。
我們先看個重複更新觸發器的程式碼示例片段,這個片段實現了每個元素都觸發的功能:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(AfterCount(1)))); .apply(Sum.integersPerKey());
在流計算系統中,其處理的時序圖如下:
資料按事件時間被切分成了2分鐘的固定視窗。每個視窗中,每來一條資料,視窗就觸發一次計算並輸出。當流計算對接的下游系統是MySQL等某個Key的資料可以被更新的話,使用者就能得到每個視窗中的最新的計算結果。
每個事件都觸發計算的模式不適合在大規模資料量的情況下使用,系統的計算效率會非常低,並且會對下游系統造成很大的寫壓力。一般情況下,在實際使用過程中,使用者通常會在處理時間上定義一個延時,多長時間輸出一次(比如每秒/每分鐘等)。
觸發器中,在處理時間延時上有兩種方式:
- 對齊延時:將處理時間上切分成固定大小的時間片,對每個key的每個視窗,時間片大小都相同。
- 非對齊延時:延時時間與視窗內資料有關。
譯者注:簡單理解,對齊延時,就是按固定時間來觸發計算。而非對齊延時,是按照資料進入系統的時間+延時時間觸發計算。
對齊延時的虛擬碼片段如下:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(AlignedDelay(TWO_MINUTES))) .apply(Sum.integersPerKey());
時序圖:
上圖表示,Process Time上,每兩分鐘各個視窗都輸出一次資料。Spark streaming中micro-batch就是對齊延時的一種實現。好處是會定期輸出結果。缺點是如果資料有負載高峰,在tps很高的時候,系統的負載也會很高。會導致延時。非對齊延時的程式碼實現如下:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(UnalignedDelay(TWO_MINUTES)) .apply(Sum.integersPerKey());
時序圖如下:
上圖中,每個Event Time視窗中,當視窗中有資料時,會在資料的Process Time上,被切成2min大小的資料塊。沒有資料時,這個視窗是不進行計算的。每個視窗的輸出時間是不同的。也就是所謂的每個視窗的輸出‘非對齊’模式。這種模式與對齊模式相比的好處是:在每個視窗上,負載更均衡。比如某個event time視窗中出現流量高峰,會立即進行計算輸出結果,而不會依賴其他視窗的情況。但最終,兩種模式的延時是相同的。
重複更新觸發器使用和理解起來非常簡單,但不能保證計算結果的正確性,無法處理late event。而Completeness triggers(完整性觸發器)能解決這個問題。我們先來了解一下watermark。
When: Watermarks
Watermark標誌著在Process Time上,何時應該輸出結果。換句話說,watermark是某個event time視窗中所有資料都到齊的標誌。一旦視窗的watermark到了,那麼這個event time窗口裡的資料就到齊了,可以進行計算了。下圖是event time和process time的關係。圖中的紅線就是watermark。Event Time和Process Time的關係可以表示為:F(P)->E,F這個公式就是watermark。
有兩種型別的watermark:
- 完美型watermark:完美性watermark指,能夠100%保證某個event time X之前的資料都到齊了,不會有late event。
- 啟發式watermark:在真實世界無界資料的處理中,無法確切知道某個event timeX之前的資料是否到齊。因此要用到啟發式watermark。啟發式watermark會根據某些條件推測X之前的資料已經到齊。但推測有可能是錯的,有可能會有late event出現。
watermark標誌著Event Time視窗中的資料是否完整,是Completeness triggers的基礎。下面看個completeness triggers的示例程式碼:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark())) .apply(Sum.integersPerKey());
我們注意到,程式碼中watermark是個Function(AfterWatermark)。這個function可以有多種實現方式,比如如果能確切知道資料是否完整,就可以用Prefect Watermark。如果不能,則要使用啟發式watermark。下圖是在同一個資料集上使用兩種不同的watermark的行為,左邊是perfect watermark,右邊是啟發式的watermark。
在以上兩種情況中,每次watermark經過event time視窗時,視窗都會輸出計算結果。區別是perfect watermark的結果是正確的,但推斷型watermark的結果是錯誤的,少了第一個視窗中‘9’這個資料。
在兩個流的outer join場景中,如何判斷輸入資料是否完整?是否能做join?如果採用在process time上延時的重複更新型觸發器進行計算,如果資料在event time有較大延時或者資料有亂序,那麼計算結果就錯了。在這種場景下,event time上的watermark對處理late event,保證結果正確性,就非常關鍵了。
當然,沒有完美的設計,watermark也有兩個明顯的缺點:
- 輸出太慢:如果資料流中有晚到資料,越趨近於perfect watermark的watermark,將會一直等這個late event,而不會輸出結果,這回導致輸出的延時增加。如上圖左邊的一側所示。在[12:00,12:02)這個視窗的輸出,與視窗第一個元素的event time相比,晚了將近7分鐘。對延時非常敏感的業務沒辦法忍受這麼長的延時。
- 輸出太快:啟發式watermark的問題是輸出太快,會導致結果不準。比如上圖中右邊一側所示,對late event ‘9’,被忽略了。
因此,水印並不能同時保證無界資料處理過程中的低延時和正確性。既然重複更新觸發器(Repeated update triggers)可以保證低延時,完整性觸發器(Completeness triggers),能保證結果正確。那能不能將兩者結合起來呢?
When: early/on-time/late triggers FTW!
上文中,我們介紹了兩種觸發器:重複更新觸發器(Repeated update triggers)和完整性觸發器(Completeness triggers),如果將兩種觸發器的優勢結合,即允許在watermark之前/之時/之後使用標準的重複更新觸發器。就產生了3種新的觸發器:early/on-time/late trigger:
- Zero or more early panes:在watermark經過視窗之前,即週期性的輸出結果。這些結果可能是不準的,但是避免了watermark 輸出太慢的問題。
- A single on-time pane:僅在watermark通過視窗結束時觸發一次。這時的結果可以看作是準確的。
- Zero or more late panes:在watermark通過視窗結束邊界之後,如果這個視窗有late event,也可以觸發計算。這樣就可以隨時更新視窗結果,避免了輸出太快導致的結果不對的問題。
在本章的例子中,在watermark的基礎上,如果加一個1min的early firing trigger和一個每個record都會輸出的late firing trigger,那麼在event time上2min的視窗,使用1min的early firing trigger每隔一分鐘就會輸出一次,並且如果有late event,late firing trigger還能糾正之前視窗輸出的結果。這樣保證了正確性的情況下,還不增加延時。
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark() .withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AfterCount(1)))) .apply(Sum.integersPerKey());
由上如所示,加上了early firing trigger和late firing trigger後,完美型watermark和推斷型watermark的結果就一致了。與沒有加這兩種trigger的實現相比,有了兩點很明顯的改進:
- 輸出太晚(too slow):在左側perfect watermark的時序圖中,第二個視窗[12:02,12:04)中,如果沒有加early firing trigger,第一個資料‘7’發生的時間是12:02, 視窗的輸出是12:09,第二個視窗的輸出延時了近7分鐘。加了early firing trigger之後,視窗第一次輸出時間是12:06,提前了3分鐘。上圖右側啟發式watermark情況也非常類似。
- 輸出太早(too fast):第一個視窗[12:00,12:02)中,啟發式視窗的watermark太早,late event ‘9’沒有被算進去,加了late firing trigger之後,當'9'進入系統時,會觸發視窗的再次計算,更正了之前視窗輸出的錯誤結果,保證了資料的正確性。
完美型watermark和推斷型watermark一個非常大的區別是,在完美型watermark例子中,當watermark經過視窗結束邊界時,這個窗口裡的資料一定是完整的,因此得出該視窗計算結果之後,就可以吧這個視窗的資料全部刪除。但啟發式watermark中,由於late event的存在,為了保證結果的正確性,需要把視窗的資料儲存一段時間。但其實我們根本不知道要把這個視窗的狀態儲存多長時間。這就引出了一個新的概念:允許延時(allowed lateness)。
When: Allowed Lateness (i.e., Garbage Collection)
為了保證資料正確性,當late event到來後能夠更新視窗結果,因此視窗的狀態需要被持久化儲存下來,但到底應該儲存多長時間呢?實際生產環境中,由於磁碟大小等限制,某視窗的狀態不可能無限的儲存下去。因此,定義視窗狀態的儲存時間為allowed lateness(允許的延遲)。也就是說,過了這個時間,視窗中資料會被清掉,之後到來的late event就不會被處理了。我們看個帶allowed lateness引數的例子:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering( AfterWatermark() .withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AfterCount(1))) .withAllowedLateness(ONE_MINUTE)) .apply(Sum.integersPerKey());
時序圖如下:
關於allowed lateness的兩個重點:
- 如果數能夠使用perfect watermark,即有序,則不需要考慮allowed lateness的問題
- 如果是對有限個key做全域性聚合,則不必考慮allowed lateness問題。(因為部分全域性聚合比如sum/agg等,可以做增量計算,不必要儲存所有資料)
How: Accumulation
如果遇到late event,要如何修改視窗之前輸出的結果呢?有三種方式:
- Discarding(拋棄):每個視窗產生輸出之後,其state都被丟棄。也就是各個視窗之間完全獨立。比較適合下游是聚合類的運算,比如對整數求和。
- Accumulating(累積):所有視窗的歷史狀態都會被儲存,每次late event到了之後,都會觸發重新計算,更新之前計算結果。這種方式適合下游是可更新的資料儲存,比如HBase/帶主鍵的RDS table等。
-
Accumulating & Retracting(累積&撤回):Accumulating與第二點一樣,即儲存視窗的所有歷史狀態。撤回是指,late event到來之後,出了觸發重新計算之外,還會把之前視窗的輸出撤回。以下兩個case非常適合用這種方式:
- 如果視窗下游是分組邏輯,並且分組的key已經變了,那late event的最新資料下去之後,不能保證跟之前的資料在同一個分組,因此,需要撤回之前的結果。
- 動態視窗中,由於視窗合併,很難知道視窗之前emit的老資料落在了下游哪些視窗中。因此需要撤回之前的結果。
以例子中第二個視窗[12:02,12:04)為例,我們分別看看三種模式的輸出結果:
Discarding | Accumulating | Accumulating& Retracting | |
---|---|---|---|
Pane 1: inputs=[7,3] | 10 | 10 | 10 |
Pane 2: inputs=[8] | 8 | 18 | 18, -10 |
Last NormalValue | 8 | 18 | 18 |
Total Sum | 18 | 28 | 18 |
- Discarding(拋棄):同一個視窗的每次輸出,都與之前的輸出完全獨立。本例子中,要算求和的話,只需要把視窗的每次輸出都加起來即可。因此Discarding 模式對下游是聚合(SUM/AGG)等場景非常何時。
- Accumulating(累積):視窗的會把之前所有state都儲存,因此同一個視窗的每個輸出,都是之前所有資料的累積值。本例子中,該視窗第一次輸出是10,第二次輸入是8,之前的狀態是10,所以輸出是18。如果下游計算直接把兩次輸出加起來,結果就是錯的。
- Accumulating & Retracting(累積&撤回):視窗的每個輸出,都有一個累積值和一個撤回值。本例中,第一次輸出10,第二次輸出的是[18,-10],因此下游把視窗的所有輸出求和,會減去之前的重複值,得到正確結果18.
Discarding 模式的程式碼示例如下:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering( AfterWatermark() .withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AtCount(1))) .discardingFiredPanes()) .apply(Sum.integersPerKey());
使用啟發式水印,在流計算引擎中,上述程式碼對應的時序圖如下:
Accumulating&Retraction示例程式碼:
PCollection<KV<Team, Integer>> scores = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering( AfterWatermark() .withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AtCount(1))) .accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());
時序圖如下:
三種模式時序圖放在一起比較如下:
三個圖從左到右分別為discarding,accumulation,accumulation&retraction三種模式的時序圖。在計算消耗(單作業使用的資源)和儲存消耗上,從左到右依次增加。
總結
總結以下本文主要講的概念:
- Event time vs processing time(事件時間 vs. 處理時間)
- 視窗
- 觸發器
- Watermarks
- Accumulation
本文主要解決的四個問題: - What results are calculated? = transformations.
- Where in event time are results calculated? = windowing.
- When in processing time are results materialized? = triggers + watermarks.
- How do refinements of results relate? = accumulation.
流計算的本質,就是平衡正確性,延時和資源這三者的關係。
整數求和 Example 2-1 / Figure 2-3 ![]() |
整數求和 Fixed windowsbatch Example 2-2 / Figure 2-5 ![]() |
整數求和 Fixed windowsstreaming Repeated per-record trigger Example 2-3 / Figure 2-6 ![]() |
整數求和 Fixed windowsstreaming Repeatedaligned-delaytrigger Example 2-4 / Figure 2-7 ![]() |
整數求和 Fixed windowsstreaming Repeatedunaligned-delaytrigger Example 2-5 / Figure 2-8 ![]() |
整數求和 Fixed windowsstreaming Heuristicwatermarktrigger Example 2-6 / Example 2-6 ![]() |
整數求和 Fixed windowsstreaming Early/on-time/late trigger Discarding Example 2-10 / Figure 2-13 ![]() |
整數求和 Fixed windowsstreaming Early/on-time/late trigger Accumulating Example 2-7 / Figure 2-11 ![]() |
整數求和 Fixed windowsstreaming Early/on-time/late trigger Accumulating& Retracting Table 2-11 / Figure 2-14 ![]() |
本章中,僅介紹了部分固定視窗的內容,下一章的主要內容是watermark。介紹完watermark後,我們會深入研究其他兩種型別的視窗。