流式處理框架storm淺析(上篇)
本文來自網易雲社群
作者:汪建偉
-
前言
前一段時間參與哨兵流式監控功能設計,調研了兩個可以做流式計算的框架:storm和spark streaming,我負責storm的調研工作。斷斷續續花了一週的時間看了官網上的doc和網路上的一些資料。我把所學到的總結成一個文件,發出來給對storm感興趣的同事做入門引導。
-
storm背景
隨著網際網路的更進一步發展,從Portal資訊瀏覽型到Search資訊搜尋型到SNS關係互動傳遞型,以及電子商務、網際網路旅遊生活產品等將生活中的流通環節線上化。對效率的要求讓大家對於實時性的要求進一步提升,而資訊的互動和溝通正在從點對點往資訊鏈甚至資訊網的方向發展,這樣必然帶來資料在各個維度的交叉關聯,資料爆炸已不可避免。因此流式處理加NoSQL產品應運而生,分別解決實時框架和資料大規模儲存計算的問題。
2011年twitter對Storm開源。以前網際網路的開發人員在做一個實時應用的時候,除了要關注應用邏輯計算處理本身,還要為了資料的實時流轉、互動、分佈大傷腦筋。現在開發人員可以快速的搭建一套健壯、易用的實時流處理框架,配合SQL產品或者NoSQL產品或者MapReduce計算平臺,就可以低成本的做出很多以前很難想象的實時產品:比如一淘資料部的量子恆道品牌旗下的多個產品就是構建在實時流處理平臺上的。
-
strom語言
Storm的主要開發語言是clojure,完成核心功能邏輯,輔助開發語言還有Python和java
-
strom的特點
1. 程式設計模型簡單
Storm同hadoop一樣,為大資料的實時計算提供了一些簡單優美的原語,這大大降低了開發並行實時處理的任務的複雜性,幫助你快速、高效的開發應用。
2. 可擴充套件
在Storm叢集中真正執行topology的主要有三個實體:工作程序、執行緒和任務。Storm叢集中的每臺機器上都可以執行多個工作程序,每個工作程序又可建立多個執行緒,每個執行緒可以執行多個任務,任務是真正進行資料處理的實體,我們開發的spout、bolt就是作為一個或者多個任務的方式執行的。 因此,計算任務在多個執行緒、程序和伺服器之間並行進行,支援靈活的水平擴充套件。
3. 高可靠
Storm可以保證spout發出的每條訊息都能被“完全處理”。 spout發出的訊息後續可能會觸發產生成千上萬條訊息,可以形象的理解為一棵訊息樹,其中spout發出的訊息為樹根,Storm會跟蹤這棵訊息樹的處理情況,只有當這棵訊息樹中的所有訊息都被處理了,Storm才會認為spout發出的這個訊息已經被“完全處理”。如果這棵訊息樹中的任何一個訊息處理失敗了,或者整棵訊息樹在限定的時間內沒有“完全處理”,那麼spout發出的訊息就會重發。 考慮到儘可能減少對記憶體的消耗,Storm並不會跟蹤訊息樹中的每個訊息,而是採用了一些特殊的策略,它把訊息樹當作一個整體來跟蹤,對訊息樹中所有訊息的唯一id進行異或計算,通過是否為零來判定spout發出的訊息是否被“完全處理”,這極大的節約了記憶體和簡化了判定邏輯,後面會在下文對這種機制進行詳細介紹。
這種模式,每傳送一個訊息,都會同步傳送一個ack/fail,對於網路的頻寬會有一定的消耗,如果對於可靠性要求不高,可通過使用不同的emit介面關閉該模式。
上面所說的,Storm保證了每個訊息至少被處理一次,但是對於有些計算場合,會嚴格要求每個訊息只被處理一次,Storm的0.7.0引入了事務性拓撲,解決了這個問題。
4. 高容錯
如果在訊息處理過程中出了一些異常,Storm會重新安排這個出問題的處理單元。Storm保證一個處理單元永遠執行(除非你顯式殺掉這個處理單元)。當然,如果處理單元中儲存了中間狀態,那麼當處理單元重新被Storm啟動的時候,需要應用自己處理中間狀態的恢復。
5. 快速
這裡的快主要是指的時延。storm的網路直傳、記憶體計算,其時延必然比hadoop的通過hdfs傳輸低得多;當計算模型比較適合流式時,storm的流式處理,省去了批處理的收集資料的時間;因為storm是服務型的作業,也省去了作業排程的時延。所以從時延上來看,storm要快於hadoop。
說一個典型的場景,幾千個日誌生產方產生日誌檔案,需要進行一些ETL操作存入一個數據庫。
假設利用hadoop,則需要先存入hdfs,按每一分鐘切一個檔案的粒度來算(這個粒度已經極端的細了,再小的話hdfs上會一堆小檔案),hadoop開始計算時,1分鐘已經過去了,然後再開始排程任務又花了一分鐘,然後作業執行起來,假設機器特別多,幾鈔鍾就算完了,然後寫資料庫假設也花了很少的時間,這樣,從資料產生到最後可以使用已經過去了至少兩分多鐘。
而流式計算則是資料產生時,則有一個程式去一直監控日誌的產生,產生一行就通過一個傳輸系統發給流式計算系統,然後流式計算系統直接處理,處理完之後直接寫入資料庫,每條資料從產生到寫入資料庫,在資源充足時可以在毫秒級別完成。
6. 支援多種程式語言
除了用java實現spout和bolt,你還可以使用任何你熟悉的程式語言來完成這項工作,這一切得益於Storm所謂的多語言協議。多語言協議是Storm內部的一種特殊協議,允許spout或者bolt使用標準輸入和標準輸出來進行訊息傳遞,傳遞的訊息為單行文字或者是json編碼的多行。
7. 支援本地模式
Storm有一種“本地模式”,也就是在程序中模擬一個Storm叢集的所有功能,以本地模式執行topology跟在叢集上執行topology類似,這對於我們開發和測試來說非常有用。
-
storm的組成
在Storm的叢集裡面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面執行一個叫Nimbus後臺程式,它的作用類似Hadoop裡面的JobTracker。Nimbus負責在叢集裡面分發程式碼,分配計算任務給機器, 並且監控狀態。
每一個工作節點上面執行一個叫做Supervisor的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作程序。每一個工作程序執行一個topology的一個子集;一個執行的topology由執行在很多機器上的很多工作程序組成。
Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper叢集完成。另外,Nimbus程序和Supervisor程序都是快速失敗(fail-fast)和無狀態的。所有的狀態要麼在zookeeper裡面, 要麼在本地磁碟上。這也就意味著你可以用kill -9來殺死Nimbus和Supervisor程序, 然後再重啟它們,就好像什麼都沒有發生過。這個設計使得Storm異常的穩定。
接下來我們再來具體看一下這些概念。
Nimbus:負責資源分配和任務排程。
Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker程序。
Worker:執行具體處理元件邏輯的程序。
Task:worker中每一個spout/bolt的執行緒稱為一個task. 在storm0.8之後,task不再與物理執行緒對應,同一個spout/bolt的task可能會共享一個物理執行緒,該執行緒稱為executor。
下面這個圖描述了以上幾個角色之間的關係。
-
Topology基本原理
Storm叢集和Hadoop叢集表面上看很類似。但是Hadoop上執行的是MapReduce jobs,而在Storm上執行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會執行(除非你手動kill掉)。
1 拓撲(Topologies)
一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連線起來,如下圖:
一個topology會一直執行直到你手動kill掉,Storm自動重新分配執行失敗的任務, 並且Storm可以保證你不會有資料丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。
2 流(Streams)
資料流(Streams)是 Storm 中最核心的抽象概念。一個數據流指的是在分散式環境中並行建立、處理的一組元組(tuple)的無界序列。資料流可以由一種能夠表述資料流中元組的域(fields)的模式來定義。在預設情況下,元組(tuple)包含有整型(Integer)數字、長整型(Long)數字、短整型(Short)數字、位元組(Byte)、雙精度浮點數(Double)、單精度浮點數(Float)、布林值以及位元組陣列等基本型別物件。當然,你也可以通過定義可序列化的物件來實現自定義的元組型別。
3 資料來源(Spouts)
資料來源(Spout)是拓撲中資料流的來源。一般 Spout 會從一個外部的資料來源讀取元組然後將他們傳送到拓撲中。根據需求的不同,Spout 既可以定義為可靠的資料來源,也可以定義為不可靠的資料來源。一個可靠的 Spout 能夠在它傳送的元組處理失敗時重新發送該元組,以確保所有的元組都能得到正確的處理;相對應的,不可靠的 Spout 就不會在元組傳送之後對元組進行任何其他的處理。
一個 Spout 可以傳送多個數據流。為了實現這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream 方法來宣告定義不同的資料流,然後在傳送資料時在 SpoutOutputCollector 的 emit 方法中將資料流 id 作為引數來實現資料傳送的功能。
Spout 中的關鍵方法是 nextTuple。顧名思義,nextTuple 要麼會向拓撲中傳送一個新的元組,要麼會在沒有可傳送的元組時直接返回。需要特別注意的是,由於 Storm 是在同一個執行緒中呼叫所有的 Spout 方法,nextTuple 不能被 Spout 的任何其他功能方法所阻塞,否則會直接導致資料流的中斷。
Spout 中另外兩個關鍵方法是 ack 和 fail,他們分別用於在 Storm 檢測到一個傳送過的元組已經被成功處理或處理失敗後的進一步處理。注意,ack 和 fail 方法僅僅對上述“可靠的” Spout 有效。
4 資料流處理元件(Bolts)
拓撲中所有的資料處理均是由 Bolt 完成的。通過資料過濾(filtering)、函式處理(functions)、聚合(aggregations)、聯結(joins)、資料庫互動等功能,Bolt 幾乎能夠完成任何一種資料處理需求
一個 Bolt 可以實現簡單的資料流轉換,而更復雜的資料流變換通常需要使用多個 Bolt 並通過多個步驟完成。例如,將一個微博資料流轉換成一個趨勢影象的資料流至少包含兩個步驟:其中一個 Bolt 用於對每個圖片的微博轉發進行滾動計數,另一個或多個 Bolt 將資料流輸出為“轉發最多的圖片”結果(相對於使用2個Bolt,如果使用3個 Bolt 你可以讓這種轉換具有更好的可擴充套件性)。
與 Spout 相同,Bolt 也可以輸出多個數據流。為了實現這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream 方法來宣告定義不同的資料流,然後在傳送資料時在 OutputCollector 的 emit 方法中將資料流 id 作為引數來實現資料傳送的功能。
在定義 Bolt 的輸入資料流時,你需要從其他的 Storm 元件中訂閱指定的資料流。如果你需要從其他所有的元件中訂閱資料流,你就必須要在定義 Bolt 時分別註冊每一個元件。對於宣告為預設 id(即上文中提到的“default”——譯者注)的資料流,InputDeclarer支援訂閱此類資料流的語法糖。也就是說,如果需要訂閱來自元件“1”的資料流,declarer.shuffleGrouping("1") 與 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID) 兩種宣告方式是等價的。
Bolt 的關鍵方法是 execute 方法。execute 方法負責接收一個元組作為輸入,並且使用 OutputCollector 物件傳送新的元組。如果有訊息可靠性保障的需求,Bolt 必須為它所處理的每個元組呼叫 OutputCollector 的 ack 方法,以便 Storm 能夠了解元組是否處理完成(並且最終決定是否可以響應最初的 Spout 輸出元組樹)。一般情況下,對於每個輸入元組,在處理之後可以根據需要選擇不傳送還是傳送多個新元組,然後再響應(ack)輸入元組。IBasicBolt 介面能夠實現元組的自動應答。
5 資料流分組(Stream groupings)
為拓撲中的每個 Bolt 的確定輸入資料流是定義一個拓撲的重要環節。資料流分組定義了在 Bolt 的不同任務(tasks)中劃分資料流的方式。
在 Storm 中有八種內建的資料流分組方式,而且你還可以通過CustomStreamGrouping 介面實現自定義的資料流分組模型。這八種分組分時分別為:
1. 隨機分組(Shuffle grouping):這種方式下元組會被儘可能隨機地分配到 Bolt 的不同任務(tasks)中,使得每個任務所處理元組數量能夠能夠保持基本一致,以確保叢集的負載均衡。
2. 域分組(Fields grouping):這種方式下資料流根據定義的“域”來進行分組。例如,如果某個資料流是基於一個名為“user-id”的域進行分組的,那麼所有包含相同的“user-id”的元組都會被分配到同一個任務中,這樣就可以確保訊息處理的一致性。
3. 部分關鍵字分組(Partial Key grouping):這種方式與域分組很相似,根據定義的域來對資料流進行分組,不同的是,這種方式會考慮下游 Bolt 資料處理的均衡性問題,在輸入資料來源關鍵字不平衡時會有更好的效能1。感興趣的讀者可以參考這篇論文,其中詳細解釋了這種分組方式的工作原理以及它的優點。
4. 完全分組(All grouping):這種方式下資料流會被同時傳送到 Bolt 的所有任務中(也就是說同一個元組會被複制多份然後被所有的任務處理),使用這種分組方式要特別小心。
5. 全域性分組(Global grouping):這種方式下所有的資料流都會被髮送到 Bolt 的同一個任務中,也就是 id 最小的那個任務。
6. 非分組(None grouping):使用這種方式說明你不關心資料流如何分組。目前這種方式的結果與隨機分組完全等效,不過未來 Storm 社群可能會考慮通過非分組方式來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個執行緒中執行。
7. 直接分組(Direct grouping):這是一種特殊的分組方式。使用這種方式意味著元組的傳送者可以指定下游的哪個任務可以接收這個元組。只有在資料流被宣告為直接資料流時才能夠使用直接分組方式。使用直接資料流傳送元組需要使用 OutputCollector 的其中一個 emitDirect 方法。Bolt 可以通過 TopologyContext 來獲取它的下游消費者的任務 id,也可以通過跟蹤 OutputCollector 的 emit 方法(該方法會返回它所傳送元組的目標任務的 id)的資料來獲取任務 id。
8. 本地或隨機分組(Local or shuffle grouping):如果在源元件的 worker 程序裡目標 Bolt 有一個或更多的任務執行緒,元組會被隨機分配到那些同進程的任務中。換句話說,這與隨機分組的方式具有相似的效果。
6 任務(Tasks)
在 Storm 叢集中每個 Spout 和 Bolt 都由若干個任務(tasks)來執行。每個任務都與一個執行執行緒相對應。資料流分組可以決定如何由一組任務向另一組任務傳送元組。你可以在 TopologyBuilder 的 setSpout 方法和 setBolt 方法中設定 Spout/Bolt 的並行度。
7 工作程序(Workers)
拓撲是在一個或多個工作程序(worker processes)中執行的。每個工作程序都是一個實際的 JVM 程序,並且執行拓撲的一個子集。例如,如果拓撲的並行度定義為300,工作程序數定義為50,那麼每個工作程序就會執行6個任務(程序內部的執行緒)。Storm 會在所有的 worker 中分散任務,以便實現叢集的負載均衡。
相關閱讀: ofollow,noindex">流式處理框架storm淺析(下篇)
網易雲免費體驗館,0成本體驗20+款雲產品!
更多網易研發、產品、運營經驗分享請訪問網易雲社群。