基於Streaming構建統一的資料處理引擎的挑戰與實踐
作者:楊克特/伍翀 整理:徐前進(Apache Flink Contributor) 校對:楊克特/伍翀
本文整理自12月20日在北京舉行的Flink-forward-china-2018大會, 分享嘉賓楊克特:花名魯尼,Apache Flink Committer,2011年碩士畢業加入阿里,參與多個計算平臺核心專案的設計和研發,包括搜尋引擎、排程系統、監控分析等。目前負責Blink SQL的研發和優化。 分享嘉賓伍翀:花名雲邪,Apache Flink Committer,從Flink v1.0就參與貢獻,從事Flink Table & SQL相關工作已有三年,目前在阿里Blink SQL專案組。
文章概述:基於Flink以流為本的計算引擎去構建一個流與批統一的解決方案 本文主要從以下5個方面來介紹基於Flink Streaming構建統一的資料處理引擎的挑戰和實踐。
-
為什麼要統一批和流
-
什麼是統一的SQL處理引擎
-
如何統一批和流
-
效能表現
-
未來計劃
一、為什麼要統一批和流

一般公司裡面都會有一個比較傳統的批處理系統每天去算一些報表,隨著越來越多更實時的需求,大家也許會採用 Storm、Flink、Spark 來做流計算,同時又會在邊上跑一個批處理,以小時或天的粒度去計算一個結果,來實現兩邊的校驗。這個就是經典的 Lambda 架構。
但是這個架構有很多問題,比如使用者需要學習兩套引擎的開發方式,運維需要同時維護兩套系統。更重要的問題是,我們需要維護兩套流程,一套增量流程,一套全量流程,同時這兩套流程之間必須要有一定的自洽性,它們必須要保證一致。當業務變得越來越複雜的時候,這種一致性本身也會成為一個挑戰。
這也是 Flink SQL 希望解決的問題,希望通過 Flink SQL,不管是開發人員還是運維人員,只需要學習一套引擎,就能解決各種大資料的問題。也不僅限於批處理或者說流計算,甚至可以更多。例如:支援高時效的批處理達到 OLAP 的效果,直接用SQL語法去做複雜事件處理(CEP),使用 Table API 或 SQL 來支援機器學習,等等。在 SQL 上還有非常多的想象空間。
二、什麼是統一的SQL處理引擎

那麼什麼是統一的 SQL 處理引擎呢?統一的 SQL 引擎道路上有哪些挑戰呢?
從使用者的角度來說,一句話來描述就是“一份程式碼,一個結果”。也就是隻需要寫一份程式碼,流和批跑出來的最終結果是一樣。這個的好處是,使用者再也不用去保證增量流程和全量流程之間的一致性了,這個一致性將由 Flink 來保證。
從開發的角度來看,其實更加關注底層架構的統一,比如說一些技術模組是不是足夠通用,流和批模式下是否能做到儘可能地複用。精心設計的高效資料結構是不是可以廣泛地應用在引擎的很多模組中。
使用者角度

首先我們有一張使用者的分數表 USER_SCORES
,裡面有使用者的名字、得分和得分時間。通過這張表來做一個非常簡單的統計,統計每個使用者的總分,以及得到這個分數的最近時間是多少? 從流計算跟批處理的角度,不管是做一張離線報表,還是實時不斷地產出計算結果,它們的SQL是完全一模一樣的,就是一個簡單的GroupBy分組,求和,求max。

如上圖有一張源資料表,有名字,分數,事件時間。
對於批處理,通過這樣一個SQL可以直接拿到最終的結果,結果只會顯示一次,因為在資料消費完之後,才會輸出結果。
對於流處理,SQL 也是一模一樣的。假設流任務是從 12:01 開始執行的,這時候還沒有收到任何訊息,所以它什麼結果都沒輸出。隨著時間推移,收到了第一條 Julie 的得分訊息,此時會輸出 Julie 7 12:01
。當到達12:04分時,輸出的結果會更新成 Julie 8 12:03
,因為又收到了一條 Julie 的得分訊息。這個結果對於最終的結果來講可能是不對的,但是至少在 12:04 這一刻,它是一個正確的結果。當時間推進到當前時間(假設是 12:08 分),所有已產生的訊息都已消費完了,可以看到這時候的輸出結果和批處理的結果是一模一樣的。
這就是“一份程式碼,一個結果”。其實從使用者的角度來講,流計算跟批處理在結果正確性上並沒有區別,只是在結果的時效性上有一些區別。
開發角度

對於開發人員來說,引擎的統一又是意味著什麼?這張圖是目前Flink的架構圖,最上層 Table API 與 SQL。在執行之前,會根據執行環境翻譯成 DataSet 或 DataStream 的 API。這兩個 API 之間還是有比較大的不同,我們可以放大之後看看。比如 DataSet API 是Flink批處理的API,它自己有一個優化器。 但是在DataStream API下,就是一些比較直觀簡單的翻譯。然後在執行時,他們也是依託於不同的task。在 DataStream 這邊,主要是執行Stream Task,同時在裡面會執行各種各樣的operator。 在批處理這邊主要是執行 Batch Task 和 Driver。這個主要是執行模式的區別。

在程式碼上能有多少能共用的呢?比如說要實現一個 INNER JOIN,以現在的程式碼距離,如果要在流上實現 INNER JOIN,首先會把兩路輸入變成兩個DataStream,然後把兩個輸入 connect 起來,再進行 keyBy join key,最後實現一個 KeyedCoProcessOperator
來完成 join 的操作。但是在 DataSet 這邊,你會發現 API 就不太一樣了,原因是 DataSet 底下是有個優化器的。換句話說,DataSet 的 API 有些是宣告式的,DataStream 的 API 是命令式的。從這個例子上來看,對於我們開發人員來說,在流計算或者是在批處理下實現 JOIN 所面對的 API 其實是比較不一樣的,所以這也很難讓我們去複用一些程式碼,甚至是設計上的複用。這個是API 上的區別,到了Runtime之後,比較大的不同是 Stream Task 跟 Batch Task 的區別。

如上圖主要的區別是 Batch Task 是以 pull 模式執行的,而 Stream Task 是以 push 模式執行的。先簡單介紹一下這兩個模式。假如說我們需要從T表掃出資料,在A欄位上做簡單的過濾,然後對B欄位進行求和。這個模型很經典,這裡我們需要關注的是控制流和資料流。
在一個經典的pull的模式下,首先會有執行器開始向執行的,可以理解為程式入口,它會向最後一個節點請求最終的結果。最後一個節點(求和節點)會向前一個節點(過濾節點)請求資料,然後再向前一個節點請求資料直到源節點。源節點就會自己去把資料讀出來,然後一層層往下傳遞,最終最後一個節點計算完求和後返回給程式入口。這和函式呼叫棧非常相像。
在 push 模式下,在程式開始執行的時候,我們會直接找到 DAG 的頭節點,所有的資料和控制訊息都由這個頭節點往下發送,控制流會跟資料流一起,相當於它同時做一個函式呼叫,並且把資料傳送給下一個運算元,最終達到求和的運算元。
通過這個簡單例子,大家可以體會一下,這兩個在執行模式上有很大的不同,這會在 runtime 統一上帶來很多問題,但其實他們完成的功能是類似的。
三、如何統一流和批
我們在深度統一流和批的道路上遇到了這麼多挑戰,那麼是如何做到統一的呢?

01 動態表

首先,大家已經越來越認同 SQL 是大資料處理的通用語言,不僅僅是因為 SQL 是一個非常易於表達的語言,還因為 SQL 是一個非常適合於流批統一的語言。但是在傳統的 SQL 裡面,SQL 是一直作用在“表”上的,不是作用在“流”上的。怎麼樣讓 SQL 能夠作用在流上,而且讓流式 SQL 的語義、結果和批一樣呢?這是我們遇到的第一個問題。為了解決這個問題,我們和社群提出了“流表對偶性”還有“動態表”的這兩個理論基礎。在這裡,這個理論基礎我們這裡就不展開了,感興趣的同學可以去官網上閱讀下這篇文章。大家只需要知道只有在基於這兩個理論的基礎上,流式SQL的語義才能夠保證和批的語義是一樣的,結果是一樣的。
02 架構改進

如圖是對架構上的一些改進。架構的改進主要集中在中間兩層,在 Runtime 層我們增強了現有的 Operator 框架,使得能支援批運算元。在 Runtime 之上,我們提出了一個 Query Processor 層,包括查詢優化和查詢執行,Table API & SQL 不再翻譯到 DataStream 和 DataSet ,而是架在 Query Processor 之上。
統一的 Operator 框架

在 Runtime 層,首先實現的是 Runtime DAG 層的統一,基於統一的DAG層之上,再去構建流的運算元和批的運算元。為了統一流和批的最底層的API,引入了一個統一的Operator層抽象。批運算元的實現不再基於 DataSet 底層 Deriver 介面實現,都基於 StreamOperator 介面來實現了。這樣流和批都使用了統一的 Operator API 來實現。
除此之外,針對批的場景,我們對 Operator 框架做了些擴充套件使得批能獲得額外的優化。
第一點是 Operator 可以自主的選擇輸入邊,例如hashjoin,批的hashjoin一般先會把build端處理完,把雜湊表先build起來,然後再去處理另外一邊的probe端。
第二點是更加靈活的 Chaining 的策略。StreamOperator 的預設 Chaining 策略只能將單輸入的 operator chain在一起。但是在批的一些場景,我們希望能夠對多輸入的Operator也進行Chaining。比如說兩個 Join Operator,我們希望也能夠 Chaining 在一起,這樣這兩個 Join Operator 之間的資料shuffle 就可以省掉。
關於統一的 Operator 框架,我們已經在社群裡面展開了討論,感興趣的同學可以關注一下這個討論連結。
統一的查詢處理

然後講一下統一的Query Processor 層,不管是流計算還是批處理的SQL,他們都將沿著相同的解析和優化的路徑往前走。在解析層,也就是將 SQL 和 Table API 程式碼解析成邏輯計劃,這裡流和批完完全全複用了一樣的程式碼。然後在優化層流和批也使用了相同的優化器來實現,在優化器裡面,所有的優化的規則都是可插拔的。流和批共用了絕大部分的優化的規則,只有少部分的規則是流特有的,或者是批特有的。然後在優化之後,得到了一個物理計劃,物理的計劃會經過翻譯成最終的Execution DAG,也就是我們剛剛講的Stream Operator運算元,最後會分散式地執行起來。
03 優化器的統一

在優化器這一層,很符合二八定律,也就是80%的優化規則都是流和批是共享的,比如說列裁剪、分割槽裁剪、條件下推等等這些都是共享的。還有20%的優化規則是流批特有的,經過我們的研究發現比較有意思的一個規律。
批這邊優化規則,很多都是跟sort相關相關的,因為流現在不支援sort,所以 sort 可以理解是批特有的,比如說一些sort merge join 的優化、sort agg的優化。
而流這邊所特有的一些規則,都是跟state相關的。為什麼呢?因為目前流作業在生產中跑一個 SQL 的作業,一般會選擇使用 RocksDB 的 StateBackend。RocksDB StateBackend,它有一個很大的瓶頸,就是你每一次的操作,都會涉及到序列化和反序列化,所以說 State 操作會成為一個流作業的瓶頸。所以如何去優化一個流作業,很多時候是思考如何節省State操作,如何減小State的size,如何避免儲存重複的State資料。這些都是目前流計算任務優化的立竿見影的方向。

這裡介紹一個流和批共用的高階一點的優化規則。大家可以先看一下上圖左邊這個query,這是一個經過簡化之後的TBCH13的query,有一張使用者表customer,還有一張訂單表 orders,customer 表會根據 custkey 去 join 上 orders 這張表,然後 join 之後,再根據 custkey 來進行分組,統計出現過的訂單的數量。
梳理一下就是要統計每個客戶下的訂單數,但是訂單的資料是存在orders表裡面的,所以就需要去join這個orders表。這個query經過解析之後,得到的邏輯計劃就是中間這個圖。可以看到customer表和orders表進行了join,join之後做了一個agg。但是這裡有一個問題,就是customer表和orders表都是兩張非常大的表,都是上億級別的。在批處理下,為這個join去build雜湊表的時候,要用到大量的buffer,甚至還需要落盤,這就可能導致這個join效能比較差。 在流處理下也是類似的,需要把customer表和orders表所有的資料都存到state裡面去。state越大,流處理效能也就越差。
所以說怎樣去節省和避免資料量是這個查詢優化的方向。我們注意到customer它本身就帶了一個主鍵就是custkey,最後的agg也是針對 custkey 進行聚合統計的。那麼其實我們可以先對orders表做一個聚合統計,先統計出每個使用者每個custkey它下一個多少的訂單,然後再和customer表做一個join,也就是說把agg進行下推,下推到了join之前,這樣子,orders表就從原來的15億的資料量壓縮到了一個億,然後再進行join。這個對於流和批都是巨大的效能優化。我們在流場景下測試發現從原先耗時六個小時提升到了14分鐘。
講這個例子目的是想說明 SQL 已經發展了幾十年了,有非常多的牛人在這個領域耕耘多年,已經有了非常多成熟的優化。這些優化,基於流批統一的模型,很多事可以直接拿過來給流用的。我們不需要再為流在開發、研究一套優化規則,做到事半功倍的效果。
04 基本資料結構的統一

原先在 Flink 中不管流還是批,具體幹活的運算元之間傳遞的都是一種叫 Row 的資料結構。 但是Row有這麼幾個比較典型的問題。
-
Row結構很簡單,裡面就存了一個Object陣列,比如說現在有一行資料,第一個是整形,另兩個是字串。那麼row裡面就會有一個Int,還有兩個String。但是我們知道Java在物件上,它會有一些額外的空間的一些開銷。
-
另外對於主型別的訪問,會有裝箱和拆箱的開銷。
-
還有在算Row的hashcode、序列化、反序列化時,需要去迭代Row裡面陣列的每一個元素的hashcode方法、反序列化方法、序列化方法,這就會涉及到很多額外的虛擬函式呼叫的開銷。
-
最後一點是對於一些稍微高階一點的資料結構,比如說排序器 sort,還有agg join中的一些hashtable,hashmap的這種二進位制的資料結構,基於Row的這種封裝,很難去做到極致的效率。

所以針對這些問題,我們也提出了一個全新的資料結構BinaryRow,然後它是完全基於二進位制的結構來設計的。BinaryRow 分成了定長區和變長區,在定長區開頭是一個null bit的一個區間,用來記錄每個欄位是否是null值。然後像int,long,double這種定長的資料型別,我們會直接把這個直接存在定長區裡面,然後string這種變長形的資料,我們會把他的變長的資料存在變長區,然後把他的指標還有他的長度存在定長區。在存放資料的時候,BinaryRow 中每一個數據塊都是八位元組對齊的。 為什麼八位元組對齊?一方面是為了更快的 random access,查詢欄位時不需要從頭遍歷,直接就能定位到欄位的位置。另一方面是能夠做到更好的cpu的快取。
BinaryRow 有一個比較重要的優點:延遲反序列化。例如從網路過來的二進位制資料、從state拿到的二進位制資料,不會馬上反序列化出來,而是會 wrap 成 BinaryRow,當需要的時候才進行反序列化,這能節省很多不必要的反序列化操作,從而提升效能。經過測試,這個資料結構不僅在批處理中表現的非常優秀,在流處理中也得到了一倍的效能提升。
05 實現Runtime共享

在Runtime的實現上,我們已經有很多實現是共用的,比如說source,sink,Correlate,CodeGeneration。這裡我們展開講講維表關聯和記憶體管理的流批覆用。
1 維表關聯

維表關聯,大家應該都比較瞭解,就是一個流要去join一個存在外部的資料庫。我們會拿流資料的 ID 去 lookup 維表,這個lookup的過程,我們會實現成同步的模式或者是非同步的模式。 我們知道 DataStream 上支援了非同步IO介面,但是DataSet是沒有的。不過由於我們統一了Operator層,所以說批可以直接複用流的 operator 實現。雖然在傳統的批處理中,如果要查詢維表,會先把維表scan下來再做 JOIN。但如果說維表特別大,probe端特別小,這樣可能是不划算的,使用lookup的方式可能會更高效一些,所以說這也是彌補了批在某些場景的一個短板。
2 流的微批處理

為了避免對state的頻繁操作,我們在流上引入了Micro-Batch 機制。實現方式就是在資料流中插入了一些 micro-batch 的事件。然後在Aggregate的Operator裡面,收到資料的時候,我們就會把它存到或直接聚合到二進位制的雜湊表裡面(快取到記憶體中)。然後當收到 micro-batch 事件的時候,再去觸發二進位制的對映表(BinaryHashMap),將快取的結果刷到 state 中,並將輸出最終結果。 這裡 BinaryHashMap 是完全和批這邊複用的。流這邊沒有去重新造一套,在效能上也得到了十倍的提升。
四、效能表現

我們先測試了一個批的效能,拿的是TPC-H去做一個測試。我們與Flink1.6.0進行了比較,這個圖是在1T的資料量下每個query的一個耗時的對比,所以說耗時越小,它的效能也就越好。可以看出每一個query,Blink都會比Flink1.6 要優秀很多。 平均效能上要比Flink1.6要快十倍。另外借助統一的架構,流也成功的攻克了所有的TBCH的query。值得一提的是,這是目前其他引擎做不到的。還有在今年的天貓雙11上流計算,達到了17億的TPS這麼大的一個吞吐量。能達到這麼高的效能表現,離不開我們今天聊的統一流批架構。

五、未來計劃
我們會繼續探索流和批的一些結合,因為流和批並不是非黑即白的,不是說批就是批作業,流就是流作業,流和批之間還有很多比較大的空間值得我們去探索。比如說一個作業,他可能一部分是一個一直執行的流作業,另一部分是一個間隔排程的批作業,他們之間是融合執行著的。再比如一個批作業執行完之後,怎麼樣能夠無縫地把它遷移成一個流作業,這些都是我們未來嘗試去做的一些研究的方向。