1. 程式人生 > >如何構建批流一體資料融合平臺的一致性語義保證?

如何構建批流一體資料融合平臺的一致性語義保證?

本文根據陳肅老師在 Apache Kafka x Flink Meetup 深圳站的分享整理而成,文章首先將從資料融合角度,談一下 DataPipeline 對批流一體架構的看法,以及如何設計和使用一個基礎框架。其次,資料的一致性是進行資料融合時最基礎的問題。如果資料無法實現一致,即使同步再快,支援的功能再豐富,都沒有意義。

另外,DataPipeline 目前使用的基礎框架為 Kafka Connect。為實現一致性的語義保證,我們做了一些額外工作,希望對大家有一定的參考意義。

最後,會提一些我們在應用 Kafka Connect 框架時,遇到的一些現實的工程問題,以及應對方法。儘管大家的場景、環境和資料量級不同,但也有可能會遇到這些問題。希望對大家的工作有所幫助。

1.jpg

一、批流一體架構

批和流是資料融合的兩種應用形態

下圖來自 Flink 官網。傳統的資料融合通常基於批模式。在批的模式下,我們會通過一些週期性執行的 ETL JOB,將資料從關係型資料庫、檔案儲存向下遊的目標資料庫進行同步,中間可能有各種型別的轉換。

2.jpg

另一種是 Data Pipeline 模式。與批模式相比相比, 其最核心的區別是將批量變為實時:輸入的資料不再是週期性的去獲取,而是源源不斷的來自於資料庫的日誌、訊息佇列的訊息。進而通過一個實時計算引擎,進行各種聚合運算,產生輸出結果,並且寫入下游。

現代的一些處理框架,包括 Flink、Kafka Streams、Spark,或多或少都能夠支援批和流兩種概念。只不過像 Kafka,其原生就是為流而生,所以如果基於 Kafka Connect 做批流一體,你可能需要對批量的資料處理做一些額外工作,這是我今天重點要介紹的。

資料融合的基本問題

如果問題簡化到你只有一張表,可能是一張 MySQL 的表,裡面只有幾百萬行資料,你可能想將其同步到一張 Hive 表中。基於這種情況,大部分問題都不會遇到。因為結構是確定的,資料量很小,且沒有所謂的並行化問題。

3.jpg

但在一個實際的企業場景下,如果做一個數據融合系統,就不可避免要面臨幾方面的挑戰:

第一,“動態性”

資料來源會不斷地發生變化,主要歸因於:表結構的變化,表的增減。針對這些情況,你需要有一些相應的策略進行處理。

第二,“可伸縮性”

任何一個分散式系統,必須要提供可伸縮性。因為你不是隻同步一張表,通常會有大量資料同步任務在進行著。如何在一個叢集或多個叢集中進行統一的排程,保證任務並行執行的效率,這是一個要解決的基本問題。

第三,“容錯性”

在任何環境裡你都不能假定伺服器是永遠在正常執行的,網路、磁碟、記憶體都有可能發生故障。這種情況下一個 Job 可能會失敗,之後如何進行恢復?狀態能否延續?是否會產生資料的丟失和重複?這都是要考慮的問題。

第四,“異構性”

當我們做一個數據融合專案時,由於源和目的地是不一樣的,比如,源是 MySQL,目的地是 Oracle,可能它們對於一個欄位型別定義的標準是有差別的。在同步時,如果忽略這些差異,就會造成一系列的問題。

第五,“一致性”

一致性是資料融合中最基本的問題,即使不考慮資料同步的速度,也要保證資料一致。資料一致性的底線為:資料先不丟,如果丟了一部分,通常會導致業務無法使用;在此基礎上更好的情況是:源和目的地的資料要完全一致,即所謂的端到端一致性,如何做到呢?

Lambda 架構是批流一體化的必然要求

目前在做這樣的平臺時,業界比較公認的有兩種架構:一種是 Lambda 架構,Lambda 架構的核心是按需使用批量和流式的處理框架,分別針對批式和流式資料提供相應的處理邏輯。最終通過一個服務層進行對外服務的輸出。

為什麼我們認為 Lambda 架構是批流一體化的必然要求?這好像看起來是矛盾的(與之相對,還有一種架構叫 Kappa 架構,即用一個流式處理引擎解決所有問題)。

6.jpg

實際上,這在很大程度來自於現實中使用者的需求。DataPipeline 在剛剛成立時只有一種模式,只支援實時流同步,在我們看來這是未來的一種趨勢。

但後來發現,很多客戶實際上有批量同步的需求。比如,銀行在每天晚上可能會有一些月結、日結,證券公司也有類似的結算服務。基於一些歷史原因,或出於對效能、資料庫配置的考慮,可能有的資料庫本身不能開 change log。所以實際上並不是所有情況下都能從源端獲取實時的流資料。

考慮到上述問題,我們認為一個產品在支撐資料融合過程中,必須能同時支撐批量和流式兩種處理模式,且在產品裡面出於效能和穩定性考慮提供不同的處理策略,這才是一個相對來說比較合理的基礎架構。

資料融合的 Ad-Hoc 模式

具體到做這件事,還可以有兩種基礎的應用模式。假如我需要將資料從 MySQL 同步到 Hive,可以直接建立一個 ETL 的 JOB(例如基於 Flink),其中封裝所有的處理邏輯,包括從源端讀取資料,然後進行變換寫入目的地。在將程式碼編譯好以後,就可以放到 Flink 叢集上執行,得到想要的結果。這個叢集環境可以提供所需要的基礎能力,剛才提到的包括分散式,容錯等。

7.jpg

資料融合的 MQ 模式

另一種模式是 ETL JOB 本身輸入輸出實際上都是面對訊息佇列的,實際上這是現在最常使用的一種模式。在這種模式下,需要通過一些獨立的資料來源和目的地聯結器,來完成資料到訊息佇列的輸入和輸出。ETL JOB 可以用多種框架實現,包括 Flink、Kafka Streams 等,ETL JOB 只和訊息佇列發生資料交換。

8.jpg

DP 選擇 MQ 模式的理由

DataPipeline 選擇 MQ 模式,主要有幾點考慮:

第一,在我們產品應用中有一個非常常見的場景:要做資料的一對多分發。資料要進行一次讀取,然後分發到各種不同的目的地,這是一個非常適合訊息佇列使用的分發模型。

第二,有時會對一次讀取的資料加不同的處理邏輯,我們希望這種處理不要重新對源端產生一次讀取。所以在多數情況下,都需將資料先讀到訊息佇列,然後再配置相應的處理邏輯。

第三,Kafka Connect 就是基於 MQ 模式的,它有大量的開源聯結器。基於 Kafka Connect 框架,我們可以重用這些聯結器,節省研發的投入。

第四,當你把資料抽取跟寫入目的地,從處理邏輯中獨立出來之後,便可以提供更強大的整合能力。因為你可以在訊息佇列上整合更多的處理邏輯,而無需考慮重新寫整個 Job。

9.jpg

相應而言,如果你選擇將 MQ 作為所有 JOB 的傳輸通道,就必須要克服幾個缺點:

第一,所有資料的吞吐都經過 MQ,所以 MQ 會成為一個吞吐瓶頸。

第二,因為是一個完全的流式架構,所以針對批量同步,你需要引入一些邊界訊息來實現一些批量控制。

第三,Kafka 是一個有持久化能力的訊息佇列,這意味著資料留存是有極限的。比如,你將源端的讀到 Kafka Topic 裡面,Topic 不會無限的大,有可能會造成資料容量超限,導致一些資料丟失。

第四,當批量同步在中間因為某種原因被打斷,無法做續傳時,你需要進行重傳。在重傳過程中,首先要將資料進行清理,如果基於訊息佇列模式,清理過程就會帶來額外的工作。你會面臨兩個困境:要麼清空原有的訊息佇列,要麼你創造新的訊息佇列。這肯定不如像直接使用一些批量同步框架那樣來的直接。

二、一致性語義保證

使用者需求

先簡單介紹一下使用者對於資料同步方面的一些基本要求:

第一種需求,批量同步需要以一種事務性的方式完成同步

無論是同步一整塊的歷史資料,還是同步某一天的增量,該部分資料到目的地,必須是以事務性的方式出現的。而不是在同步一半時,資料就已經在目的地出現了,這可能會影響下游的一些計算邏輯。

第二種需求,流式資料儘可能快的完成同步

大家都希望越快越好,但相應的,同步的越快,吞吐量有可能因為你的引數設定出現相應的下降,這可能需要有一個權衡。

第三種需求,批量和流式可能共存於一個 JOB

作為一個數據融合產品,當用戶在使用DataPipeline時,通常需要將存量資料同步完,後面緊接著去接增量。然後存量與增量之間需要進行一個無縫切換,中間的資料不要丟、也不要多。

**第四種需求,按需靈活選擇一致性語義保證
**

DataPipeline 作為一個產品,在客戶的環境中,我們無法對客戶資料本身的特性提出強制要求。我們不能要求客戶資料一定要有主鍵或者有唯一性的索引。所以在不同場景下,對於一致性語義保證,使用者的要求也不一樣的:

比如在有主鍵的場景下,一般我們做到至少有一次就夠了,因為在下游如果對方也是一個類似於關係型資料庫這樣的目的地,其本身就有去重能力,不需要在過程中間做一個強一致的保證。但是,如果其本身沒有主鍵,或者其下游是一個檔案系統,如果不在過程中間做額外的一致性保證,就有可能在目的地產生多餘的資料,這部分資料對於下游可能會造成非常嚴重的影響。

資料一致性的鏈路視角

如果要解決端到端的資料一致性,我們要處理好幾個基本環節:

**第一,在源端做一個一致性抽取
**

一致性抽取是什麼含義?即當資料從通過資料聯結器寫入到 MQ 時,和與其對應的 offset 必須是以事務方式進入 MQ 的。

第二,一致性處理

如果大家用過 Flink,Flink 提供了一個端到端一致性處理的能力,它是內部通過 checkpoint 機制,並結合 Sink 端的二階段提交協議,實現從資料讀取處理到寫入的一個端到端事務一致性。其它框架,例如 Spark Streaming 和 Kafka Streams 也有各自的機制來實現一致性處理。

第三,一致性寫入

在 MQ 模式下,一致性寫入,即 consumer offset 跟實際的資料寫入目的時,必須是同時持久化的,要麼全都成功,要麼全部失敗。

10.jpg

第四,一致性銜接

在 DataPipeline 的產品應用中,歷史資料與實時資料的傳輸有時需要在一個任務共同完成。所以產品本身需要有這種一致性銜接的能力,即歷史資料和流式資料,必須能夠在一個任務中,由程式自動完成它們之間的切換。

Kafka Connect 的一致性保證

Kafka Connect 如何保證資料同步的一致性?就目前版本,Kafka Connect 只能支援端到端的 at least once,核心原因在於,在 Kafka Connect 裡面,其 offset 的持久化與資料傳送本身是非同步完成的。這在很大程度上是為了提高其吞吐量考慮,但相應產生的問題是,如果使用 Kafka Connect,框架本身只能為你提供 at least once 的語義保證。

在該模式下,如果沒有通過主鍵或下游應用進行額外地去重,同步過程當中的資料會在極端情況下出現重複,比如源端傳送出一批資料已經成功,但 offset 持久化失敗了,這樣在任務恢復之後,之前已經發送成功的資料會再次重新發送一批,而下游對這種現象完全是不知情的。目的端也是如此,因為 consumer 的 offset 也是非同步持久化,就會到導致有可能資料已經持久化到 Sink,但實際上 consumer offset 還沒有推進。這是我們在應用原生的 Kafka Connect 框架裡遇到最大的兩個問題。

11.jpg

三、DP 的解決之道

二階段提交協議

DataPipeline 如何解決上述問題?首先,需要用協議的方式保證每一步都做成事務。一旦做成事務,由於每個環節都是解耦的,其最終資料就可以保證一致性。下圖為二階段提交協議的最基礎版本,接下來為大家簡單介紹一下。

12.jpg

首先,在二階段提交協議中,對於分散式事務的參與方,在 DataPipeline 的場景下為資料寫入與 offset 寫入,這是兩個獨立元件。兩者之間的寫入操作由 Coordinator 進行協調。第一步是一個 prepare 階段,每一個參與方會將資料寫入到自己的目的地,具體持久化的位置取決於具體應用的實現。

第二步,當 prepare 階段完成之後,Coordinator 會向所有參與者發出 commit 指令,所有參與者在完成 commit 之後,會發出一個 ack,Coordinator 收到 ack 之後,事務就完成了。如果出現失敗,再進行相應的回滾操作。其實在分散式資料庫的設計領域中,單純應用一個二階段提交協議會出現非常多的問題,例如 Coordinator 本身如果不是高可用的,在過程當中就有可能出現事務不一致的問題。

所以應用二階段提交協議,最核心的問題是如何保證 Coordinator 高可用。所幸在大家耳熟能詳的各種框架裡,包括 Kafka 和 Flink,都能夠通過分散式一致協議實現 Coordinator 高可用,這也是為什麼我們能夠使用二階段提交來保證事務性。

Kafka 事務訊息原理

關於 Kafka 事務訊息原理,網上有很多資料,在此簡單說一下能夠達到的效果。Kafka 通過二階段提交協議,最終實現了兩個最核心的功能。

第一,一致性抽取

上文提到資料要被髮送進 Kafka,同時 offset 要被持久化到 Kafka,這是對兩個不同 Topic 的寫入。通過利用 Kafka 事務性訊息,我們能夠保證 offset 的寫入和資料的傳送是一個事務。如果 offset 沒有持久化成功,下游是看不到這批資料的,這批資料實際上最終會被丟棄掉。

13.jpg

所以對於源端的傳送,我們對 Kafka Connect 的 Source Worker 做了一些改造,讓其能夠提供兩種模式,如果使用者的資料本身是具備主鍵去重能力的,就可以繼續使用 Kafka Connect 原生的模式。

如果使用者需要強一致時,首先要開啟一個源端的事務傳送功能,這就實現了源端的一致性抽取。其可以保證資料進 Kafka 一端不會出現資料重複。這裡有一個限制,即一旦要開啟一致性抽取,根據 Kafka 必須要將 ack 設定成 all,這意味著一批資料有多少個副本,其必須能夠在所有的副本所在的 broker 都已經應答的情況下,才可以開始下一批資料的寫入。儘管會造成一些效能上的損失,但為了實現強一致,你必須要接受這一事實。

**第二,一致性處理
**

事務性訊息最早就是為 Kafka Streams 設計和準備的。可以寫一段 Kafka Streams 應用,從 Kafka 裡讀取資料,然後完成轉化邏輯,進而將結果再輸出回 Kafka。Sink 端再從 Kafka 中消費資料,寫入目的地。

資料一致性寫入

之前簡要談了一下二階段提交協議的原理,DataPipeline 實現的方式不算很深奧,基本是業界的一種統一方式。其中最核心的點是,我們將 consumer offset 管理從 Kafka Connect 框架中獨立出來,實現事務一致性提交。另外,在 Sink 端封裝了一個類似於 Flink 的 TwoPhaseCommitSinkFunction 方式,其定義了 Sink 若要實現一個二階段提交所必須要實現的一些功能。

14.jpg

DataPipeline 將 Sink Connector 分為兩類,一類是 Connector 本身具備了事務能力,比如絕大部分的關係型資料庫,只需將 offset 跟資料同時持久化到目的地即可。額外的可能需要有一張 offset 表來記錄提交的 offset。還有一類 Sink 不具備事務效能力,類似像 FTP、OSS 這些物件儲存,我們需要去實現一個二階段提交協議,最終才能保證 Sink 端的資料能夠達到一致性寫入。

資料一致性銜接

關於批量資料與實時資料如何銜接的問題,主要有兩個關鍵點:

第一,當開始進行一個批量資料同步時,以關係型資料庫為例,你應該拿到當時一個整體資料的 Snapshot,並在一個事務中同時記錄當時對應的日誌起始值。以 MySQL 為例,當要獲取一個 Binlog 起始偏移量時,需要開啟一個 START TRANSACTION WITH CONSISTENT SNAPSHOT,這樣才能保證完成全量之後,後期的讀取增量日誌同步不會產生重複資料。

第二,如果採用增量同步模式,則必須根據實際的資料業務領域,採用一種比較靈活的增量表達式,才能避免讀到寫到一半的資料。比如在你的資料中,其 ID 是一個完全自增,沒有任何重複的可能,此時只需每次單純的大於上一次同步的最後一條記錄即可。

但如果是一個時間戳,無論精度多高,都有可能在資料庫產生相同的時間戳,所以安全的做法是每次迭代時,取比當前時間稍微少一點,保證留出一個安全時間,比如五秒甚至一分鐘,這樣你永遠不會讀到一些時間戳可能會產生衝突的這部分資料,避免遺漏資料。這是一個小技巧,但如果沒有注意,在使用過程中就會產生各種各樣的問題。

還有一點是上面提及的,如何能夠在一個流式框架實現批量同步的一致性,對於所有的流式框架,需要引入一些邊界條件來標誌著一次批量同步的開始和結束。DataPipeline 在每次批量傳送開始和結束後,會引入一些控制量訊號,然後在 Sink端進行相應處理。同樣為了保證事務一致性,在 Sink 端處理這種批量同步時,依然要做一些類似於二階段提交這樣的方式,避免在一些極端情況下出現資料不一致的問題。

四、問題和思考

上文介紹的是 DataPipeline 如何基於 Kafka Connect 做事務同步一致性的方案。

DataPipeline 在使用 Kafka Connect 過程中遇到過一些問題,目前大部分已經有一些解決方案,還有少量問題,可能需要未來採用新的方法/框架才能夠更好的解決。

第一,反壓的問題

Kafka Connect 設計的邏輯是希望實現源端和目的端完全解耦,這種解偶本身是一個很好的特性。但也帶來一些問題,源和目的地的 task 完全不知道彼此的存在。剛才我提到 Kafka 有容量限制,不能假定在一個客戶環境裡面,會給你無限的磁碟來做緩衝。通常我們在客戶那邊預設 Topic 為 100G 的容量。如果源端讀的過快,大量資料會在 Kafka 裡堆積,目的端沒有及時消費,就有可能出現數據丟失,這是一個非常容易出現的問題。

怎麼解決?DataPipeline 作為一個產品,在 Kafka Connect 之上,做了控制層,控制層中有像 Manager 這樣的邏輯元件,會監控每一個 Topic 消費的 lag,當達到一定閾值時,會對源端進行限速,保證源和目的地儘可能匹配。

第二,資源隔離

Connect Worker 叢集無法對 task 進行資源預留,多個 task 並行執行會相互影響。Worker 的 rest 介面是佇列式的,單個叢集任務過多會導致啟停緩慢。

我們正在考慮利用外部的資源排程框架,例如 K8s 進行 worker 節點管理;以及通過路由規則將不同優先順序任務執行在不同的 worker 叢集上,實現預分配和共享資源池的靈活配置。

第三,Rebalance

在 2.3 版本以前,Kafka Connect 的 task rebalance 採用 stop-the-world 模式,牽一髮動全身。在 2.3 版本之後,已經做了非常大優化,改為了具有粘性的 rebalance。所以如果使用 Kafka Connect,強烈推薦一定要升級到 2.3 以上的版本,也就是目前的最新版本。

五、未來演進路線

基於 MQ 模式的架構,針對大批量資料的同步,實際上還是容易出現效能瓶頸。主要瓶頸是在 MQ 的叢集,我們並不能在客戶環境裡無限優化 Kafka 叢集的效能,因為客戶提供的硬體資源有限。所以一旦客戶給定了硬體資源,Kafka 吞吐的上限就變為一個固定值。所以針對批量資料的同步,可能未來會考慮用記憶體佇列替代 MQ。

同時,會採用更加靈活的 Runtime,主要是為了解決剛才提到的預分配資源池和共享資源池的統一管理問題。

另外,關於資料質量管理,實際上金融類客戶對資料質量的一致性要求非常高。所以對於一些對資料質量要求非常高的客戶,我們考慮提供一些後校驗功能,尤其是針對批量同步。


▼ Apache Flink 社群推薦 ▼

Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 重磅開啟,大會議程精彩上線,瞭解 Flink Forward Asia 2019 的更多資訊,請檢視:

https://developer.aliyun.com/special/ffa2019

首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與效能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:

https://tianchi.aliyun.com/markets/tianchi/flink2019

 

閱讀原文

本文為雲棲社群原創內容,未經