1. 程式人生 > >實時計算——聊一聊我所經歷的計算框架

實時計算——聊一聊我所經歷的計算框架

在聊實時計算之前,先說一下我對離線和批量、實時和流式的一些看法。

我們首先來簡單看一下計算任務的大致流程:

首先先說下批量計算和流式計算:

圖中顯示了一個計算的基本流程,receiver處負責從資料來源接收資料,併發送給下游的task,資料由task處理後由sink端輸出。

以圖為例,批量和流式處理資料粒度不一樣,批量每次處理一定大小的資料塊(輸入一般採用檔案系統),一個task處理完一個數據塊之後,才將處理好的中間資料傳送給下游。流式計算則是以record為單位,task在處理完一條記錄之後,立馬傳送給下游。

假如我們是對一些固定大小的資料做統計,那麼採用批量和流式效果基本相同,但是流式有一個好處就是可以實時得到計算中的結果,這對某些應用很有幫助,比如每1分鐘統計一下請求server的request次數。

那問題來了,既然流式系統也可以做批量系統的事情,而且還提供了更多的功能,那為什麼還需要批量系統呢?因為早期的流式系統並不成熟,存在如下問題:

1.流式系統的吞吐不如批量系統

2.流式系統無法提供精準的計算

後面的介紹Storm、Spark streaming、Flink主要根據這兩點來進行介紹。

批量和流式的區別:

1.資料處理單位:

批量計算按資料塊來處理資料,每一個task接收一定大小的資料塊,比如MR,map任務在處理完一個完整的資料塊後(比如128M),然後將中間資料傳送給reduce任務。

流式計算的上游運算元處理完一條資料後,會立馬傳送給下游運算元,所以一條資料從進入流式系統到輸出結果的時間間隔較短(當然有的流式系統為了保證吞吐,也會對資料做buffer)。

這樣的結果就是:批量計算往往得等任務全部跑完之後才能得到結果,而流式計算則可以實時獲取最新的計算結果。

2.資料來源:

批量計算通常處理的是有限資料(bound data),資料來源一般採用檔案系統,而流式計算通常處理無限資料(unbound data),一般採用訊息佇列作為資料來源。

3.任務型別:

批量計算中的每個任務都是短任務,任務在處理完其負責的資料後關閉,而流式計算往往是長任務,每個work一直執行,持續接受資料來源傳過來的資料。

離線=批量?實時=流式?

習慣上我們認為離線和批量等價;實時和流式等價,但其實這種觀點並不完全正確。

假設一種情況:當我們擁有一個非常強大的硬體系統,可以毫秒級的處理Gb級別的資料,那麼批量計算也可以毫秒級得到統計結果(當然這種情況非常極端,目前不可能),那我們還能說它是離線計算嗎?

所以說離線和實時應該指的是:資料處理的延遲;批量和流式指的是:資料處理的方式。兩者並沒有必然的關係。事實上Spark streaming就是採用小批量(batch)的方式來實現實時計算。

可以參考下面連結:https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101。

整理了一份適合2018年學習的大資料資料需要的加群QQ群:834325294 註明CSDN既可免費獲取作者是Google實時計算的負責人,裡面闡述了他對批量和實時的理解,並且作者認為批量計算只是流式計算的子集,一個設計良好的流式系統完全可以替代批量系統。本人也從中受到了很多啟發。

介紹完這些概念後,下面我們就來簡單看看目前流行的實時計算框架的實現和區別。

Storm

Storm做為最早的一個實時計算框架,早期應用於各大網際網路公司,這裡我們依然使用work count舉例:

 

spout:負責從資料來源接收資料

bolt:負責資料處理,最下游的bolt負責資料輸出

spout不斷從資料來源接收資料,然後按一定規則傳送給下游的bolt進行計算,最下游的bolt將最終結果輸出到外部系統中(這裡假設輸出到DB),這樣我們在DB中就可以看到最新的資料統計結果。Storm每一層的運算元都可以配置多個,這樣保證的水平擴充套件性。因為往往處理的是unbound data,所以storm中的運算元都是長任務。

容災是所有系統都需要考慮的一個問題,考慮一下:假如執行過程中,一個運算元(bolt)因某種原因掛了,Storm如何恢復這個任務呢?

 

批處理解決方案就比較簡單,拿MR舉例,假如一個執行中map或reduce失敗,那麼任務重新提交一遍就ok(只不過重頭計算又要花費大量時間),下面我們看看Storm是如何解決的:

storm的spout有一個buffer,會快取接收到的record,並且Storm還有一個acker(可以認為是一個特殊的bolt任務),每條record和該record所產生的所有tuple在處理完成後都會向對應的acker傳送ack訊息,當acker接收到該record所有的ack訊息之後,便認為該record處理成功,並通知spout從buffer中將該record移除,若receiver沒有在規定的時間內接收到ack,acker則通知spout重放資料。

acker個數可以由使用者指定,因為資料量比較大時,一個acker可能處理不過來所有的ack資訊,成為系統瓶頸(如果可以容忍資料丟失,當然也可以關閉ack機制,可以顯著提高系統性能)。並且acker採用了巧妙的機制,優化了ack機制的資源佔用(有興趣的同學可以參考官網,網上也有很多部落格介紹ack具體實現)。

Storm採用ack機制實現了資料的重放,儘管做了很多優化,但是畢竟每條record和它產生的tuple都需要ack,對吞吐還是有較大的影響,關閉ack的話,對於某些不允許丟資料的業務來說又是不可接受的。

Storm的這種特點會導致大家認為:流式計算的吞吐不如批量計算。(這點其實是不對的,只能說Storm的設計導致了它的吞吐不如批量計算,一個設計優秀的流式系統是有可能擁有和批處理系統一樣的吞吐)

資料不重不丟

之前我們提到早期的流式系統無法提供精準的計算服務,下面我們詳細瞭解一下:

 

sink處的重複輸出:假如執行過程中,boltA資料入庫後,boltB因為某種原因crash了,這時候會導致該record重放,boltA中已經處理過的資料會再次入庫,導致部分資料重複輸出。

不僅sink處存在重複輸出的問題,receiver處也同樣存在這種問題。(在講解Spark streaming處會詳細介紹什麼情況下receiver會重複接收資料)

Storm沒有提供exactly once的功能,並且開啟ack功能後又會嚴重影響吞吐,所以會給大家一種印象:流式系統只適合吞吐相對較小的、低延遲不精確的計算;而精確的計算則需要由批處理系統來完成,所以出現了Lambda架構,該架構由Storm的創始人提出,簡單的理解就是同時執行兩個系統:一個流式,一個批量,用批量計算的精確性來彌補流式計算的不足,但是這個架構存在一個問題就是需要同時維護兩套系統,代價比較大。

那麼有沒有一種架構,可以滿足高吞吐、低延遲的要求,同時也提供exactly once功能?有的,下面我們來看看Spark streaming。

Spark streaming

吞吐

Spark streaming採用小批量的方式,提高了吞吐效能:

 

這裡我們簡單展示Spark streaming的執行機制,主要是與Storm做下對比。Spark streaming批量讀取資料來源中的資料,然後把每個batch轉化成內部的RDD。Spark streaming以batch為單位進行計算(預設1s產生一個batch),而不是以record為單位,大大減少了ack所需的開銷,顯著提高了吞吐。

但也因為處理資料的粒度變大,導致Spark streaming的資料延時不如Storm,Spark streaming是秒級返回結果(與設定的batch間隔有關),Storm則是毫秒級。

不重不丟(exactly once)

Spark streaming通過batch的方式提高了吞吐,但是同樣存在上面所說的資料丟失和重複的問題。

在解答這個問題之前,我們先來了解一下一些概念:

1.at most once:最多消費一次,會存在資料丟失

2.at least once:最少消費一次,保證資料不丟,但是有可能重複消費

3.exactly once:精確一次,無論何種情況下,資料都只會消費一次,這是我們最希望看到的結果

大部分流式系統都提供了at most once和at least once功能,但不是所有系統都能提供exactly once。

我們先看看Spark streaming的at least once是如何實現的,Spark streaming的每個batch可以看做是一個Spark任務,receiver會先將資料寫入WAL,保證receiver宕機時,從資料來源獲取的資料能夠從日誌中恢復(注意這裡,早期的Spark streaming的receiver存在重複接收資料的情況),並且依賴RDD實現內部的exactly once(可以簡單的理解採用批量計算的方式來實現)。RDD:Resilient Distributed Dataset彈性分散式資料集,Spark儲存著RDD之間的依賴關係,保證RDD計算失敗時,可以通過上游RDD進行重新計算(RDD如何實現容錯這裡就不解釋了,可以自行查資料)。

上面簡單解釋了Spark streaming依賴源資料寫WAL和自身RDD機制提供了容災功能,保證at least once,但是依然無法保證exactly once,在回答這個問題前,我們再來看一下,什麼情況Spark streaming的資料會重複計算。

 

這裡我們主要關注圖中的3個紅框:

Spark streaming的RDD機制只能保證內部計算exactly once(圖中的1),但這是不夠的,回想一下剛才Storm的例子,假如某個batch中,sink處一部分資料已經入庫,這時候某個sink節點宕機,導致該節點處理的資料重複輸出(圖中的3,Storm處已經解釋過了)。還有另一種情況就是receiver處重複接收資料(圖中的2),我們看一下receiver重複接收資料的情況:

 

假如receiverA目前從kafka讀到pos=100的記錄,並且已經持久化到HDFS,但是由於網路延遲沒有及時更新pos,此時receiverA宕機了,receiverB接管A的資料,並且後續的任務還會從pos=100處重新讀取,導致重複消費。造成這種情況的主要原因就是:receiver處資料消費和Kafka中position的更新沒有做到原子性。

根據上面的討論,可以得出:一個流式系統如果要做到exactly once,必須滿足3點:

1.receiver處保證exactly once

2.流式系統自身保證exactly once

3.sink處保證exactly once

這裡資料來源採用Kafka舉例是因為Kafka作為目前主流的分散式訊息佇列,比較有代表性。Kafka consumer的position可以儲存在ZK或者Kafka中,也可以由consumer自己來儲存。前者的話就可能存在資料消費和position更新不一致的問題(因為無法保證原子性,也是之前Spark streaming採用的方式),而採用後者的話,consumer可以採用事務更新的方式(寫本地或者採用事務的方式寫資料庫),保證資料消費和position更新的原子性,從而實現exactly once(參考)。

Spark streaming 實現 exactly once

Spark streaming1.3版本新添加了Kafka Direct API來實現資料接收的exactly once,本質上就是上面提到的後者,Spark streaming自己維護position,streaming的worker直接從Kafka讀取資料,position由Spark streaming管理,不再依賴ZK儲存,同時保證資料消費和更新position的原子性,從而實現exactly once。

並且新的方式已經不再需要receiver持久化資料,因為Kafka本身就支援資料持久化,可以避免receiver處持久化資料的開銷,實現exactly once的同時也提高了效能。

而sink處的exactly once的實現則視外部系統而定,比如檔案系統本身就支援冪等(同一個操作執行多次,不會改變之前的結果),同時Spark streaming也提供了api,使用者可以自己實現sink處的事務更新,receiver、sink和Spark streaming三者結合起來才能實現了真正的exactly once。

Storm trident本質上也是採用了小批量的方式,並且也實現了exactly once語義,這裡就不做過多討論。

直到這裡,我們瞭解到Spark streaming擁有較好的吞吐和exactly once語義,解決了Storm一些不足,是不是隻有采用類似Spark streaming這種小批量(micro-batch)的方式才能實現這些功能?答案是:NO。下面我們來看看Flink。

Flink

Flink在資料處理的方式上和Storm類似,並沒有採用小批量,是一個真正的流式系統。它不僅擁有了不弱於Spark streaming的吞吐,並且提供了exactly once語義。既然Flink也是逐條處理記錄,那麼它是怎麼做到的呢?跟上我的腳步...(下面內容大部分參考官網,撿重點的翻譯,想起來一個段子:如何快速成為業界大牛?答:翻譯英文文件。。hahaha,開個玩笑^ ^)

簡單來說,Flink採用輕量級分散式快照實現容錯,大致流程是:Flink不斷的對整個系統做snapshot,snapshot資料可以放在master上或外部系統(如HDFS),假如發生故障時,Flink停止整個資料流,並選出最近完成的snapshot,將整個資料流恢復到該snapshot那個時間點,snapshot本身比較輕量,而且使用者可以自行配置snapshot的間隔,snapshot的效能開銷對系統的影響很小(官方測試snapshot開啟前後的效能差距

 

barrier是分散式snapshot實現中一個非常核心的元素,barrier和records一起在流式系統中傳輸,barrier是當前snapshot和下一個snapshot的分界點,它攜帶了當前snapshot的id,假設目前在做snapshot N,運算元在傳送barrier N之前,都會對當前的狀態做checkpoint(checkpoint資料可以儲存在外部系統中,如HDFS),checkpoint只包含了barrier N之前的資料狀態,不會涉及barrier N之後的資料。

 

因為運算元很多情況下需要接收多個運算元的資料(shuffle操作),所以只有當所有上游的傳送的barrier N都到達之後,運算元才會將barrier N傳送給下游(所有的下游)。當所有的sink運算元都接收到barrier N之後,才會認為該snapshot N成功完成。

為了保證一致性,需要遵守以下幾個原則:

1.一旦運算元接收到某一個上游運算元的barrier之後,它不能再處理該上游後面的資料,只有當它所有上游運算元的barrier都到達,並將barrier傳送給下游之後,才能繼續處理資料,否則的話會造成snapshot N和N+1的資料重疊。

2.某個上游運算元的barrier到達之後,該上游運算元後續的資料將會被快取在input buffer中。

3.一旦所有上游的運算元的barrier都到達之後,該運算元將資料和barrier傳送給下游。

4.傳送成功之後,該運算元繼續處理input buffer中的資料,並繼續接收處理上游運算元傳送過來的資料。(有點囉嗦啊)

下面我們來看一個完整的snapshot流程圖:

 

圖片有點不清晰,可以自己去官網看,筆者比較蠢,怎麼都截不下來高清的

圖中的Master儲存了snapshot的狀態,假設資料還是從Kafka中獲取,首先receiver運算元會先將當前的position傳送給master,記錄在snapshot中,並同時向下遊傳送barrier,下游的運算元接收到barrier後,發起checkpoint操作,將當前的狀態記錄在外部系統中,並更新Master中snapshot狀態,最後當所有的sink運算元都接收到barrier之後,更新snapshot中的狀態,此時認為該snapshot完成。

通過這種輕量級的分散式snapshot方式,Flink實現了exactly once,同時Flink也支援at least once,也就是運算元不阻塞barrier已經到達的上游運算元的資料(多個上游運算元的情況),這樣可以降低延遲,但是不保證exactly once。

從圖中我們可以看出Kafka position也是由Flink自己維護的,所以能夠保證receiver處的exactly once,sink處也同樣存在Spark streaming一樣的問題,exactly once依賴外部系統或需要使用者自己實現。Flink官網給出了目前支援的Data Sources和Sinks以及容錯的粒度。

其中sink處採用Kafka的話不支援exactly once,個人猜想是不是因為早期的Kafka producer沒有支援exactly once語義,而導致Flink無法支援。Kafka0.11版本中添加了producer exactly once的支援,是否後續能夠新增進來?

講到這裡,我們可以瞭解到:

1.流式系統並不一定就是吞吐差的代名詞

2.流式系統也可以做到exactly once

就如Google流式系統負責人Tyler Akidau所說:一個設計良好的流式系統是能夠在吞吐完全媲美批量系統,並且提供精準的實時服務。(那是不是以後可以完全用流式系統取代批量系統?)

window和event time

Flink相比Spark streaming不僅提供了更低的延遲,而且Flink還對window和event time提供了更好的支援。window和event time又是什麼呢?

window

現實生活中,大部分資料來源其實是unbound data,沒有邊界,我們沒有辦法得到一個最終的統計結果,很多情況下我們會對固定時間間隔的資料進行統計,比如每5s統計一下伺服器的qps,window機制能夠幫我們很好的完成這項需求。

 

如圖(標號代表事件發生的時間),流式系統會每隔5s建立一個window,將該時間段的資料放入buffer,累加後輸出結果。圖中0-5s產生的資料放在第一個window中(3s處有兩條資料),累加後輸出count=6。

window型別也有很多種,上圖是一個Tumbling Windows的例子,另外還有Sliding Windows和session window,具體區別讀者可以自行查資料。

上圖是一個比較理想的示例圖,理想很豐滿,現實很骨感,事情往往不盡如人意(情不自禁的都想唱起來了:人生已經如此的艱難,有些事情就不要ao......流式系統的破事怎麼這麼多!!),直接按接收時間來劃分window可能會存在誤差:

 

假設由於網路延遲,應該屬於第一個視窗的資料3延遲到達,被分到了第二個視窗,這時候計算結果並不準確。怎麼辦呢?

event time和process time:

假設一個流式系統目前正在接收並處理使用者手機的日誌,但是由於網路延遲,或者使用者手機離線,導致日誌沒有及時傳送到流式系統,流式系統觀察到資料的時間和資料真正產生的時間可能存在偏差,我們把資料真正產生的時間叫做:event time,把流式系統處理該資料的時間叫做:process time。

 

event time和process time往往會存在延遲,這種不一致會導致資料亂序,如圖所示:藍色事件晚於黃色事件發生,但是事件的處理卻先於黃色事件。

早期的流式系統並沒有區分process time和event time,往往將process time等同於event time。針對這一問題,一個很直觀的解決方案就是:讓資料自身攜帶timestamp,該timestamp記錄該資料產生的時間,即為event time,流式系統按資料的event time來將資料分配到對應的視窗,而不是按處理資料的時間。

window需要知道該視窗的資料都已經全部到達,然後觸發計算邏輯,如何window判斷時間T之前的資料是否都已經到達呢?

watermark

那就是引入watermark機制,watermark同樣也攜帶一個時間戳,當運算元接收到watermark T後,就代表時間T之前的資料已經接收完畢,不會再有小於時間T的資料。

 

如圖:W(17)到達後,表示後續資料的時間戳不會小於17。那可能有人會問了:那就是有一部分小於17的資料他喵的就是比w(17)還晚到了怎麼辦?

watermark還會配合一個allow lateness引數,window接收到watermark後,再等待一段時間才會關閉視窗,如果這段時間有些資料依然沒有傳送過來,那就只能忽略它們了(window的內心os:我也嘗試過等待,但我還有更重要的事情要做),而且考慮到流式系統的實時性,假如可接受的時間內,資料沒有傳輸過來,那就算等到它過來再計算,從實時性這個角度來說,這時計算的結果也有可能也已經沒有意義了。

Flink對window和watermark都提供了較好的支援,Spark streaming從2.0中也開始引入watermark功能,但是支援的功能有限,並且真正的流式可以更優雅、簡單的實現window和watermark,從這個角度來看,Flink是優於Spark streaming的。

總結:

瞭解了Storm、Spark streaming、Flink各自的特點後,我們知道Storm提供了低延遲的計算,但是吞吐較低,並且無法保證exactly once(Storm trident採用batch的方式改善了這兩點),Spark streaming通過小批量的方式保證了吞吐的情況下,同時提供了exactly once語義,但是實時性不如Storm,而且由於採用micro-batch的方式,對window和event time的支援比較有限(Spark streaming2.0中引入了window和event time,還在起步階段)。Flink採用分散式快照的方式實現了一個高吞吐、低延遲、支援exactly once的流式系統,流式處理的方式也能更優雅的支援window和event time。

當然也不是說Flink一定就比Storm、Spark streaming好,沒有最好的框架,只有最合適的框架,根據自身的業務、公司的技術儲備選擇最合適的框架才是正確的選擇。