Storm核心基礎
一、Stream: 被處理的資料
二、Spout:資料來源
訊息源Spout是Storm的Topology中的訊息生產者(Tuple的創造者)。如圖幾個Spout介面都繼承自IComponent
Spout從外部獲取資料後,向Topology發出的Tuple可以是可靠的,也可以是不可靠的
可靠的:一個可靠的訊息可以重新發射一個Tuple(如果該Tuple沒有被Storm成功處理)
不可靠的:一個不可靠的訊息源Spout一旦發出,一個Tuple就會徹底遺忘,不會在重新發了
Spout可以發射多個Stream,使用OutputFieldsDeclarer.declareStream來定義多個流,然後使用SpoutOutputCollector來發射指定的流
Spout中幾個重要的方法:
1、open方法 : 當一個Task被初始化時會呼叫此open方法,一般都會在此方法中初始化傳送Tuple的物件SpoutOutputCollector和配置物件TopologyContext
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; }
2、declareOutputFields方法:聲明當前Spout的Tuple傳送流。
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence"));//告訴元件發出資料流包含sentence欄位 }
3、nextTuple方法:發射一個Tuple到Topology都是通過該方法。
public void nextTuple() { this.collector.emit(new Values("123")); }
三、Bolt: 處理資料
Bolt是接收Spout發出元組Tuple後處理資料的元件,所有的訊息處理邏輯被封裝在Bolt中,Bolt負責處理輸入的資料流併產生輸出的新資料流;Bolt把元組Tuple作為輸入,之後處理產生新的Tuple;
1、客戶機建立Bolt,然後將其序列化為拓撲,並提交給叢集的主機
2、叢集啟動worker程序,反序列化Bolt,呼叫prepare方法開始處理元組
3、Bolt處理元組,Bolt處理一個輸入Tuple,發射0個或多個元組,然後呼叫ack通知Storm自己已經處理過這個Tuple了。Strom提供一個IBasicBolt自動呼叫ack。
在建立Bolt物件時,通過構造方法,初始化成員變數,當Bolt被提交到叢集時,這些成員變數也會被序列化,所以通過反序列化可以取到這些成員變數
IBolt繼承了Serializable,在建立Bolt在序列化之後被髮送到具體執行的Worker上,worker在執行Bolt時候先執行perpare方法傳入當前執行的上下文,然後呼叫execute方法,對Tuple進行處理,並用prepare傳入的OutPutCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果
IBasic介面在執行execute方法時,自動呼叫ack方法,其目的就是實現該Bolt時,不用在程式碼中提供反饋結果,Storm內部會自動反饋成功
幾個重要的方法:
1.prepare方法:preparre方法為Bolt提供了OutputCollector,用來從Bolt中傳送Tuple,在Bolt中載入新的執行緒非同步處理,OutputCollector是執行緒安全的。Bolt中prepare、execute、cleanup等方法中進行
2.declareOutPutFields方法:聲明當前Bolt傳送的Tuple中包含的欄位;Bolt可以發射多條訊息流,使用OutputFieldsDeclarer.declareStream方法來定義流,之後使用OutputCollector.emit來選擇要發射的流
3、getComponentConfiguration方法:當系統需要每隔一段時間執行特定的處理時,就可以用它
4、execute方法::以一個Tuple作為輸入,Bolt使用OutPutColector來發射Tuple,Bolt必須為他處理的每一個Tuple呼叫ack方法,以通知Storm該Tuple處理完畢了,從而通知該Tuple的發射者Spout
1) emit有一個引數:該引數是傳送到下游Bolt的Tuple,此時由上游發來的舊Bolt就此隔斷,新的Tuple和舊的Tuple不在屬於同一顆Tuple數。新的Tuple另起一顆新的Tuple樹
2)emit有兩個引數:第一個引數是舊的Tuple的輸入流,第二個引數是新的往下游Bolt下發的Tuple流。此時新的Tuple和舊的Tuple還屬於同一顆Tuple樹,即如果下游的Bolt處理失敗,則向上傳遞到當前Bolt,當前Bolt根據舊的Tuple繼續往上游傳遞,申請重發失敗的Tuple,保證Tuple處理的可靠性
四、Tuple: 資料單元
Tuple是Strom的主要資料結構,並且是Storm中使用的最基本單元、資料模型和元組;Tuple是一個值列表,Tuple中的值可以是任何型別的,動態型別的Tuple的fields可以不用宣告;預設情況下,Storm中的Tuple支援私有型別,字串,位元組陣列等作為它的欄位值,如果使用其他型別,就需要序列化該型別。
Tuple的預設型別:integer、float、double、long、short、string、byte、binary(byte[])。Tuple可以理解為鍵值對,其中鍵就是定義在declareOutputFields方法中的Fields物件,值就是在emit中傳送的Values物件
Tuple宣告週期:
1、Storm呼叫Spout的nextTuple方法來獲取下一個Tuple
2、Spout通過Open方法的引數提供的SpoutOutputCollector將新的Tuple發射到其中一個輸出訊息流(發射Tuple時,Spout提供一個message-id,通過這個ID來追蹤該Tuple)
3、Storm跟蹤該Tuple的樹形結構是否成功建立,並根據message-id呼叫Spout中的ack函式,已確認Tuple是否被完全處理。
4、如果Tuple超時,則呼叫Spout的fail方法
5、在任務完成後,Spout呼叫Cloes方法結束Tuple的使命
public interface ISpout extends Serializable { void open(Map var1, TopologyContext var2, SpoutOutputCollector var3); void close(); void activate(); void deactivate(); void nextTuple(); void ack(Object var1); void fail(Object var1); }
五、Task: 執行Spout和Bolt中的執行緒
同一個Spout/Bolt的Task可能會共享一個物理執行緒,該執行緒稱為Executor。實際的資料處理由Task來完成,Topology的生命週期中,Task數量不會變化,而Executor數量卻不一定,在一般情況下,執行緒數小於等於Task數量。預設Task的數量等於Executor執行緒數量,即一個Executor執行緒只執行一個Task,Executor執行緒在執行期間會呼叫該Task的nextTuple或Executor
Worker:是執行這些執行緒的程序
一個worker程序一直一個Topology子集,他會啟動一個或多個Executor執行緒來執行一個Topology的元件。因此在執行拓撲時,可能跨越一個或多個Worker,Storm會盡量均勻分配任務給所有的worker,不會出現一個Worker為多個Topology服務的情況
六、Stream Grouping :規定了Bolt接收何種型別資料作為輸入
Storm包括6種流分組型別:
1、隨機分組(Shuffer Grouping):隨機分發元組到Bolt的任務,保證每個任務獲得相等數量的元組
2、欄位分組(Fields Grouping):根據指定欄位分割資料流並分組
3、全部分組(ALL Grouping):對於每一個Tuple來說,所有Bolt都會收到,所有Tuple被複制到Bolt的所有任務上
4、全域性分組(Global Grouping):全部的流都分配到Bolt的同一任務,就是分配給ID,最小的Task
5、無分組(NO Grouping):不分組的含義是,流不關心到底誰會收到它的Tuple,目前無分組等效於隨機分組,不同的是Storm把無分組的Bolt放到訂閱Bolt或Spout的同一執行緒中執行(在可能實現的前提下)
6、直接分組(Direct Grouping):元組生產者決定元組由那個元組消費者接受
七、Topology: 是由Straming Grouping連線起來的Spout和Bolt節點網路
1、本地模式:
Config conifg = new Config(); config.setDebug(true); config.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test",config,builder.creatTopology()); Utils.sleep(1000); cluster.killTopology("test"); cluster.shutdown();
submitTopology有三個引數:要允許Topology的名稱,一個配置物件,以及要執行的Topology的本身
Topoogy是以名稱來唯一區別的,可以用這個名稱殺掉該Topology,而且必須顯示的殺掉,否則他會一直執行
幾個比較重要的配置:
config.setNumWorkers(2):定義希望叢集分配多少個工作程序來執行這個Topology,Topology中的每個元件都需要執行緒來執行。每個元件到底用多少個執行緒是通過setBolt和setSpout來指定的
config.setDebug(true):Storm會記錄下每個元件發射的每條資訊
Topology方法呼叫流程:
1.每個元件(Spout或Bolt)的構造方法和declareOutputFields方法都只被呼叫一次
2.open和prepare方法被呼叫多次,在入口函式中設定的setSpout或setBolt中的並行度引數指Execute的數量,是負責執行元件中Task的數量,此數量是多少,上述兩個方法就會被呼叫多少次,在每個Execute執行時呼叫一次
3.nextTuple方法和execute方法是一直執行的,nextTuple方法不斷髮射Tuple,Bolt的execute不斷接受Tuple進行處理。只有這樣不斷進行,才會產生無界的Tupe流。
4.提交一個Topology之後,Storm建立Spout/Bolt例項並進行序列化,之後將序列化的元件傳送給所有任務所在的節點,在每一個任務上反序列化元件
5.Spout和Bolt之間,Bolt和Bolt之間的通訊,通過ZeroMQ的訊息佇列實現
6.在一個Tuple處理成功之後,需要呼叫ack方法來標記成功,否則呼叫fail方法標記失敗,重新處理該Tuple
Topology中幾個比較重要的並行度相關概念
1.Worker(工作程序):每個worker都屬於一個特定的Topology,每個Supervisor節點的Worker可以有多個,每個Worker使用一個額單獨的埠,Worker對Topology中的每個元件執行一個或者多個Executor執行緒來提供Task的執行服務
2.Executor
Executor是產生於Worker程序內部的執行緒,會執行同一個元件的一個或多個Task
3.Task
實際的資料處理由Task完成,在Topology的宣告週期中,每個元件的Task數量不會變化,而Executor的數量卻不一定,Executor數量小於等於Task數量,在預設情況下,二者是相等的
worker、executor、task設定
1、Worker設定:可以設定yaml中Topology.workers屬性,在程式碼中通過Config的setNumWorker方法設定
2、Executor設定:通過Topology入口類中的setBolt、setSpout方法的最後一個引數指定,如果不指定,則使用預設值為1
3、Task設定:在預設情況下和Executor數量一致,在程式碼中通過TopologyBuilder的setNumTasks方法設定具體某個元件的Task數量
Storm叢集中的一個物理節點啟動一個或多個worker程序,叢集的topology都是通過這些程序執行的,然而,worker程序中又會執行一個或多個Executor執行緒,每個Executor執行緒只會執行一個Topology的一個元件(spout或bolt)的Task任務,task又是資料處理的實體單元。worker是程序,Executor對應於執行緒,Spout或Bolt是一個個Task;同一個Worker只執行同一個Topology相關的Task;在同一個Executor中可以執行多個同類型的Task,即在同一個Executor中,要麼全部是Bolt類的Task,要麼全部是Spout的Task;在執行時,Spout和Bolt需要包裝成一個又一個Task