1. 程式人生 > >深入理解Flink核心技術(轉載)

深入理解Flink核心技術(轉載)

優點 流程圖 align 優化器 red 興趣 hdf 定義 lin

技術分享圖片

作者:李呈祥

Flink項目是大數據處理領域最近冉冉升起的一顆新星,其不同於其他大數據項目的諸多特性吸引了越來越多的人關註Flink項目。本文將深入分析Flink一些關鍵的技術與特性,希望能夠幫助讀者對Flink有更加深入的了解,對其他大數據系統的開發者也能有所裨益。

註:本文假設讀者對MapReduce,Spark及Storm等大數據處理系統有基本了解,同時熟悉流處理與批處理的基本概念。36大數據(http://www.36dsj.com/)

Flink簡介

Flink的核心是一個流式的數據流執行引擎,其針對數據流的分布式計算提供了數據分布,數據通信以及容錯機制等功能。基於流執行引擎,Flink提供了諸多更高抽象層的API以方便用戶編寫分布式任務:

1. DataSet API, 對靜態數據進行批處理操作,將靜態數據抽象成分布式的數據集,用戶可以方便的采用Flink提供的各種操作符對分布式數據集進行各種操作,支持Java,Scala和Python。

2. DataStream API,對數據流進行流處理操作,將流式的數據抽象成分布式的數據流,用戶可以方便的采用Flink提供的各種操作符對分布式數據流進行各種操作,支持Java和Scala。

3. Table API,對結構化數據進行查詢操作,將結構化數據抽象成關系表,並通過Flink提供的類SQL的DSL對關系表進行各種查詢操作,支持Java和Scala。

此外,Flink還針對特定的應用領域提供了領域庫,例如:

1. Flink ML,Flink的機器學習庫,提供了機器學習Pipelines API以及很多的機器學習算法實現。

2. Gelly,Flink的圖計算庫,提供了圖計算的相關API以及很多的圖計算算法實現。

Flink的技術棧如下圖所示:36大數據(http://www.36dsj.com/)

技術分享圖片

                  圖1 Flink技術棧

此外,Flink也可以方便地和其他的Hadoop生態圈的項目集成,例如,Flink可以讀取存儲在HDFS或HBase中的靜態數據,以Kafka作為流式的數據源,直接重用MapReduce/Storm代碼,或是通過YARN申請集群資源等等。

統一的批處理與流處理系統

在大數據處理領域,批處理任務與流處理任務一般被認為是兩種不同的任務,一個大數據項目一般會被設計為只能處理其中一種任務,例如Apache Storm,Apache Smaza只支持流處理任務,而Aapche MapReduce, Apache Tez,Apache Spark只支持批處理任務。

Spark Streaming是Apache Spark之上支持流處理任務的子系統,看似一個特例,實則不然。Spark Streaming采用了一種micro-batch的架構,即將輸入的數據流切分成細粒度的batch數據,對於每一個batch數據,以此為輸入提交一個批處理Spark任務,所以Spark Streaming本質上還是基於Spark批處理系統對流式數據進行處理,和Apache Storm,Apache Smaza等完全流式的數據處理方式完全不同。Flink能夠同時處理批處理任務與流處理任務,其靈活的執行引擎支持完全原生的批量的數據處理和流式的數據處理。

在執行引擎這一層,流處理系統與批處理系統最大的不同在於節點間數據傳輸的方式。對於一個流處理系統,其節點間數據傳輸的標準模型是:當一條數據被處理完成後,序列化到緩存中,然後立刻通過網絡傳輸到下一個節點,由下一個節點繼續處理。而對於一個批處理系統,其節點間數據傳輸的標準模型是:當一條數據被處理完成後,序列化到緩存中,並不會立刻通過網絡傳輸到下一個節點,當緩存寫滿,就持久化到本地硬盤上,當所有數據都被處理完成後,才開始將處理後的數據通過網絡傳輸到下一個節點。36大數據(http://www.36dsj.com/)

這兩種數據傳輸模式是兩個極端,對應的是流處理系統對低延遲的要求和批處理系統對高吞吐量的要求。Flink的執行引擎采用了一種十分靈活的方式,同時支持了這兩種數據傳輸模型。Flink以固定的緩存塊為單位進行網絡數據傳輸,用戶可以通過緩存塊超時值指定緩存塊的傳輸時機。如果緩存塊的超時值為0,則Flink的數據傳輸方式類似上面提到的流處理系統的標準模型,此時系統可以獲得最低的處理延遲。

如果緩存塊的超時值為無限大,則Flink的數據傳輸方式類似上面提到的批處理系統的標準模型,此時系統可以獲得最高的處理吞吐量。同時緩存塊的超時值也可以設置為0到無限大之間的任意值。緩存塊的超時閾值越小,則Flink流處理執行引擎的數據處理延遲越低,但吞吐量也會越低,緩存塊的超時閾值越大時,則反之。通過調整緩存塊的超時閾值,用戶可根據自己的需要靈活的權衡Flink的延遲和吞吐量。

技術分享圖片

                圖2 Flink執行引擎數據傳輸模式

在統一的流式執行引擎的基礎上,Flink同時支持了流處理系統與批處理系統,並且保證了其流處理系統與批處理系統的性能(延遲,吞吐量等),相對於其他原生的流處理與批處理系統,並沒有因為統一的執行引擎而受到影響。用戶可以在Flink上同時執行批處理任務與流處理任務,這大大減輕了用戶安裝,部署,監控,維護等成本。36大數據(http://www.36dsj.com/)

Flink流處理的容錯機制

對於一個分布式系統來說,單個進程或是節點崩潰導致整個Job失敗是經常發生的事情,在異常發生的時候不會丟失用戶數據,並能夠自動恢復是分布式系統的需要支持的特性之一。本節主要介紹Flink流處理系統對於任務級別的容錯機制。

批處理系統比較容易實現容錯機制,由於文件可以重復訪問,當某個任務失敗後,重啟該任務即可。但是在流處理系統中,由於數據源是無限的數據流,一個流處理任務甚至可能會執行幾個月,將所有數據緩存或是持久化,留待以後重復訪問基本上是不可行的。Flink基於分布式快照與可部分重發的數據源實現了容錯,用戶可自定義對整個Job進行快照的時間間隔,當出現任務失敗時,Flink將整個Job恢復到最近一次快照的狀態,並從數據源重發快照之後的數據。

Flink的分布式快照的實現借鑒了Chandy和Lamport在1985年發表的一篇關於分布式快照的論文,其實現的主要思想如下:

按照用戶自定義的分布式快照間隔時間,Flink會在定時在所有數據源中插入一種特殊的快照標記消息,這些快照標記消息和其他消息一樣在DAG中流動,但是不會被用戶定義的業務邏輯所處理,每一個快照標記消息都將其所在的數據流分成兩部分:本次快照數據和下次快照數據。36大數據(http://www.36dsj.com/)

技術分享圖片

            圖3 Flink包含快照標記消息的消息流

快照標記消息沿著DAG流經各個操作符,當操作符處理到快照標記消息時,會對自己的狀態進行快照,並存儲起來。當一個操作符有多個輸入的時候,Flink會將先抵達的快照標記消息及其之後的消息緩存起來,當所有的輸入中對應該次快照的快照標記消息全部抵達後,操作符對自己的狀態快照並存儲,之後處理所有快照標記消息之後的已緩存消息。操作符對自己的狀態快照並存儲可以是異步與增量的操作,並不需要阻塞消息的處理。分布式快照的流程如下圖所示:

技術分享圖片

                  圖4 Flink分布式快照流程圖

當所有的Data Sink(終點操作符)都收到快照標記信息並對自己的狀態快照和存儲後,整個分布式快照就完成了,同時通知數據源釋放該快照標記消息之前的所有消息。若之後發生節點崩潰等異常情況時,只需要恢復之前存儲的分布式快照狀態,並從數據源重發該快照以後的消息就可以了。

Exactly-Once是流處理系統需要支持的一個非常重要的特性,它保證每一條消息被流處理系統處理一次,且僅被處理一次,許多流處理任務的業務邏輯都依賴於Exactly-Once特性。相對於At-Least-Once或是At-Most-Once, Exactly-Once特性對流處理系統的要求更嚴格,實現也更困難。Flink基於分布式快照實現了Exactly-Once特性。36大數據(http://www.36dsj.com/)

相對於其他流處理系統的容錯方案,Flink基於分布式快照的方案在功能和性能方面都具有很多優點,包括:

1. 低延遲。由於操作符狀態的存儲可以是異步的,所以進行快照的過程基本上不會阻塞消息的處理,對消息的延遲不會產生負面的影響。

2. 高吞吐量。當操作符狀態較少時,對吞吐量基本沒有影響。當操作符狀態較多時,相對於其他的容錯機制,分布式快照的時間間隔是用戶自定義的,所以用戶可以權衡錯誤恢復時間和吞吐量的要求,調整分布式快照的時間間隔。

3. 與業務邏輯的隔離。Flink的分布式快照機制與用戶的業務邏輯是完全隔離的,用戶的業務邏輯不會依賴或是對分布式快照產生任何影響。

4. 錯誤恢復代價。分布式快照的時間間隔越短,錯誤恢復的時間越少,與吞吐量負相關。

Flink流處理的時間窗口

對於流處理系統來說,流入的消息是無限的,所以對於聚合或是連接等操作,流處理系統需要對流入的消息進行分段,然後基於每一段數據進行聚合或是連接等操作。消息的分段即稱為窗口,流處理系統支持的窗口有很多類型,最常見的就是時間窗口,基於時間間隔對消息進行分段處理。本節主要介紹Flink流處理系統支持的各種時間窗口。

對於目前大部分流處理系統來說,時間窗口一般是根據Task所在節點的本地時鐘來進行切分,這種方式實現起來比較容易,不會阻塞消息處理。但是可能無法滿足某些應用的要求,例如:

1. 消息本身帶有時間戳,用戶希望按照消息本身的時間特性進行分段處理。

2. 由於不同節點的時鐘可能不同,以及消息在流經各個節點時延遲不同,在某個節點屬於同一個時間窗口處理的消息,流到下一個節點時可能被切分到不同的時間窗口中,從而產生不符合預期的結果。

Flink支持三種類型的時間窗口,分別適用於用戶對於時間窗口不同類型的要求:

1. Operator Time。根據Task所在節點的本地時鐘來進行切分的時間窗口。

2. Event Time。消息自帶時間戳,根據消息的時間戳進行處理,確保時間戳在同一個時間窗口的所有消息一定會被正確處理。由於消息可能是亂序流入Task的,所以Task需要緩存當前時間窗口消息處理的狀態,直到確認屬於該時間窗口的所有消息都被處理後,才可以釋放其狀態。如果亂序的消息延遲很高的話,會影響分布式系統的吞吐量和延遲。

3. Ingress Time。有時消息本身並不帶有時間戳信息,但用戶依然希望按照消息而不是節點時鐘劃分時間窗口(例如,避免上面提到的第二個問題)。此時可以在消息源流入Flink流處理系統時,自動生成增量的時間戳賦予消息,之後處理的流程與Event Time相同。Ingress Time可以看成是Event Time的一個特例,由於其在消息源處時間戳一定是有序的,所以在流處理系統中,相對於Event Time,其亂序的消息延遲不會很高,因此對Flink分布式系統的吞吐量和延遲的影響也會更小。

Event Time時間窗口的實現

Flink借鑒了Google的MillWheel項目,通過WaterMark來支持基於Event Time時間窗口。

當操作符通過基於Event Time的時間窗口來處理數據時,它必須在確定所有屬於該時間窗口的消息全部流入此操作符後,才能開始處理數據。但是由於消息可能是亂序的,所以操作符無法直接確認何時所有屬於該時間窗口的消息全部流入此操作符。36大數據(http://www.36dsj.com/)

WaterMark包含一個時間戳,Flink使用WaterMark標記所有小於該時間戳的消息都已流入,Flink的數據源在確認所有小於某個時間戳的消息都已輸出到Flink流處理系統後,會生成一個包含該時間戳的WaterMark,插入到消息流中輸出到Flink流處理系統中,Flink操作符按照時間窗口緩存所有流入的消息,當操作符處理到WaterMark時,它對所有小於該WaterMark時間戳的時間窗口的數據進行處理並發送到下一個操作符節點,然後也將WaterMark發送到下一個操作符節點。

為了保證能夠處理所有屬於某個時間窗口的消息,操作符必須等到大於這個時間窗口的WaterMark之後,才能開始對該時間窗口的消息進行處理,相對於基於Operator Time的時間窗口,Flink需要占用更多的內存,且會直接影響消息處理的延遲時間。對此,一個可能的優化措施是,對於聚合類的操作符,可能可以提前對部分消息進行聚合操作,當有屬於該時間窗口的新消息流入時,基於之前的部分聚合結果繼續計算,這樣的話,只需緩存中間計算結果即可,無需緩存該時間窗口的所有消息。

對於基於Event Time時間窗口的操作符來說,流入WaterMark的時間戳與當前節點的時鐘一致是最簡單理想的狀況了,但是在實際環境中是不可能的,由於消息的亂序以及前面節點處理效率的不同,總是會有某些消息流入時間大於其本身的時間戳,真實WaterMark時間戳與理想情況下WaterMark時間戳的差別稱為Time Skew,如下圖所示:

技術分享圖片

                圖5 WaterMark的Time Skew圖

Time Skew決定了該WaterMark與上一個WaterMark之間的時間窗口所有數據需要緩存的時間,Time Skew時間越長,該時間窗口數據的延遲越長,占用內存的時間也越長,同時會對流處理系統的吞吐量產生負面影響。

基於時間戳的排序

在流處理系統中,由於流入的消息是無限的,所以對消息進行排序基本上被認為是不可行的。但是在Flink流處理系統中,基於WaterMark,Flink實現了基於時間戳的全局排序。

Flink基於時間戳進行排序的實現思路如下:排序操作符緩存所有流入的消息,當其接收到WaterMark時,對時間戳小於該WaterMark的消息進行排序,並發送到下一個節點,在此排序操作符中釋放所有時間戳小於該WaterMark的消息,繼續緩存流入的消息,等待下一個WaterMark觸發下一次排序。

由於WaterMark保證了其之後不會出現時間戳比它小的消息,所以可以保證排序的正確性。需要註意的是,如果排序操作符有多個節點,只能保證每個節點的流出消息是有序的,節點之間的消息不能保證有序,要實現全局有序,則只能有一個排序操作符節點。

通過支持基於Event Time的消息處理,Flink擴展了其流處理系統的應用範圍,使得更多的流處理任務可以通過Flink來執行。

定制的內存管理

略,請參考上篇文章:脫離JVM? Hadoop生態圈的掙紮與演化

總結

本文主要介紹了Flink項目的一些關鍵特性,Flink是一個擁有諸多特色的項目,包括其統一的批處理和流處理執行引擎,通用大數據計算框架與傳統數據庫系統的技術結合,以及流處理系統的諸多技術創新等,因為篇幅有限,Flink還有一些其他很有意思的特性沒有詳細介紹,比如DataSet API級別的執行計劃優化器,原生的叠代操作符等,感興趣的讀者可以通過Flink的官網了解更多Flink的詳細內容。希望通過本文的介紹能夠讓讀者對Flink項目能有更多的了解,也讓更多的人使用甚至參與到Flink項目中去。36大數據(http://www.36dsj.com/)

深入理解Flink核心技術(轉載)