1. 程式人生 > >Storm概念、原理詳解及其應用(一)BaseStorm

Storm概念、原理詳解及其應用(一)BaseStorm

when 結構 tails 並發數 vm 虛擬機 cif 異步 優勢 name

本文借鑒官文,添加了一些解釋和看法,其中有些理解,寫的比較粗糙,有問題的地方希望大家指出。寫這篇文章,是想把一些官文和資料中基礎、重點拿出來,能總結出便於大家理解的話語。與大多數“wordcount”代碼不同的是,並不會有如何運行第一storm代碼等內容,只有在運行完代碼後,發現需要明白:“知其然,並知其所以然”。

Storm是什麽?為什麽要用Storm?為什麽不用Spark? 第一個問題,以下概念足以解釋: Storm是基於數據流的實時處理系統,提供了大吞吐量的實時計算能力。通過數據入口獲取每條到來的數據,在一條數據到達系統的時候,立即會在內存中進行相應的計算;Storm適合要求實時性較高的數據分析場景。
第二問題: 很多場景下,我們希望系統能夠實時的處理一條數據、甚至是事務。也就是說,在處理數據、事務的過程中,到達系統,並能馬上得到結果。其次,在成萬上億條數據大量湧入系統時,也要求“實時”的到事務處理的結果。此時,單個節點已經是杯水車薪了,而Storm的關鍵一項是因為它支持分布式並行計算!如果說,你遇到了以上相似的場景,那Storm可以當仁不讓的扛起實時處理的大旗! 第三個問題: 這個問題其實很難界定,因為Spark在RDD粒度上,可以滿足實時計算的要求,當然,使用RDD還有其他優勢;但總的來說,Storm 的實時性更強。其次,Storm的框架完全按照流式處理的思想構建,和項目場景結合性更強一些。(Spark 用的不是很多,歡迎吐槽。) 進入正題, 在看Storm之前,很多人都對Hadoop有一定了解,為了能更快入戲,我們以Hadoop為參照,以下是它使用yarn之前的架構,對照Storm Server框架理解。 Hadoop、Storm系統和組件接口對比表: 技術分享
Storm框架: 技術分享 上面這幅圖是Stom框架圖,和很多分布式系統一樣,基於zk作為集群配置運行的元數據基礎平臺。 nimbus和supervisor是服務器端守護進程,守護進程的文章會在Storm概念、原理詳解及其應用(二)Storm Cluster。 以下是對啟動一個應用所需要的集群上JVM進程線程的簡單介紹,建議記憶後再繼續閱讀。 · Nodes (服務器):指配置在一個 Storm 集群中的服務器,會執行 topology 的一部分 運算。一個 Storm 集群可以包括一個或者多個工作 node 。 · Workers (JVM 虛擬機):指一個 node 上相互獨立運行的 JVM 進程。每個 node 可 以配置運行一個或者多個 worker 。一個topology 會分配到一個或者多個 worker 上 運行。 · Executor (線程):指一個 worker 的jvm 進程中運行的 Java 線程。多個 task 可以 指派給同一個 executer 來執行。除非是明確指定, Storm 默認會給每個 executor 分 配一個 task。 · Task (bolt/spout 實例): task 是 spout 和bolt 的實例, 它們的 nextTuple() 和 execute() 方法會被executors 線程調用執行。 例如:
builder.setSpout(spoutName, spout, spoutParallelism).setNumTasks(2)
這裏可以定義spoutParallelism = 2,即對應兩個executor線程,tasks為兩個實例。 (此處配置的原理,會在接下來會講到worker和並發中解釋。) 可以看出,雖然在這設置了多個task實例,但是並行度並沒有提高(而executor在不同的worker上執行,存在並行),因為只有兩個線程去運行這些實例,只有設置足夠多的線程和實例才可以真正的提高並行度;在這設置多個實例主要是為了下面執行rebalance的時候用到。 為什麽要用rebalance? 這裏一直在啟動、操作的是“線程”,真正的process需要在配置中設置worker數量,也就是說topology啟動時已經決定了worker數量(即並行數量)。因為rebalance不需要修改代碼,就可以動態修改topology的並行度,這樣的話就必須提前配置好多個實例,在rebalance的時候主要是對之前設置多余的任務實例分配線程去執行。 在命令行動態修改並行度: 除了使用代碼進行調整,還可以在shell命令行下對並行度進行調整。 storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2 表示 10秒之後對mytopology進行並行度調整。把spout調整為2個executor,把bolt調整為2個executor 註意:並行度主要就是調整executor的數量,但是調整之後的executor的數量必須小於等於task的數量,如果分配的executor的線程數比task數量多的話也只能分配和task數量相等的executor。 概念: 官方對於Storm下名詞概念的解釋如下: 1、Topologies 2、Streams 3、Spouts 4、Bolts 5、Stream groupings 6、Reliability 7、Tasks 8、Workers 1、Topologies(拓撲) Topology是Storm中實時應用的一種封裝。其功能 analogous to a MapReducejob,但唯一不同的是它是循環執行的——無數據流等待,有數據流執行,直到被kill progress。 一個Topology是spouts和bolts組成並被Stream groupings連接的一副流程圖,相關概念如下: Resources:
  • TopologyBuilder: use this class to construct topologies in Java:在java中,該類構建了topologies。
  • Running topologies on a production cluster:在生產集群中,運行多個topologies。
  • Local mode: Read this to learn how to develop and test topologies in local mode. 在本地模型中開發和測試topologies。
Topology結構: 技術分享 2、Streams (流) Stream在Storm中是一個核心的抽象概念。一個流是由無數個元組序列構成,這些元組並行、分布式的被創建和執行。在stream的許多元組中,Streams被定義為以Fields區域命名的一種模式。默認情況下,元組支持:integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays. 你也可以定義自己的序列化器,使這種風格類型能夠被自然的使用在元組中。 每一個Stream在聲明的時候都會賦予一個id。單個Stream——spouts和bolts,可以使用OutputFieldsDeclarer 的convenience方法聲明一個stream,而不用指定一個id。但是這種方法會給予一個默認的id——default,相關概念如下: Resources:
  • Tuple: streams are composed of tuples:Tuple是一個interface,對應的實現類 TupleImpl。
  • OutputFieldsDeclarer: used to declare streams and their schemas
  • Serialization: Information about Storm‘s dynamic typing of tuples and declaring custom serializations
Ps:Storm中的tuple是接口,沒有具體實現,但原話這麽解釋的: Storm needs to know how to serialize all the values in a tuple. By default, Storm * knows how to serialize the primitive types, strings, and byte arrays. 3、Spouts 在Topology中,每個Spout都是一個Streams源,通常情況下,Spouts會從外部源讀取Tuple,並輸入這些Tuple到Topology中。 Spouts既是可靠的又是不可靠的,因為,可靠的spout會在發送Tuple失敗的情況下,重復發送;相反,不可靠的spout會忘記它發送過的Tuple,無論是否成功。 Spout代碼過程: Spouts能夠發送多個流:使用OutputFieldsDeclarer(interface)的declareStream方法聲明多個流,並且當使用SpoutOutputCollector(實現2,接口模式)的emit方法可以指定這個流去發送Tuple。 Spouts的主要方法之一是:nextTuple() 發送tuple,nextTuple可以發送一個新的Tuple到Topology,或者當沒有新的Tuple被發送的時候,就簡單的返回。對於任何spout的實現,nextTuple都不能阻塞,因為Storm調用的所有spout都是基於同一個線程! 其次是 ack 和 fail 方法,它們都會被調用,當Storm發現一個tuple被從spout發射後,要麽成功地完成的通過topology,要麽錯誤的完成。ack 和 fail 方法只有在可靠的spouts下才能被調用。spout可靠性,請搜本頁下面內容,或移至代碼。

Resources:

  • IRichSpout: this is the interface that spouts must implement.
  • Guaranteeing message processing
Ps:nextTuple()方法中會發送Tuple,至於那種對象能發送,請看上述。 Qu:1、在代碼中如何讓聲明的留和發送tuple聯系起來,因為聲明流的名稱並不是tuple對象名? 2、是Storm中Spout的nextTuple對應一個線程,還是多個Spout對應一個線程? answer:在集群中,應該是每個node的JVM中啟動一個線程跑spout 4、Bolts 在Topologies中所有的處理都會在bolts中被執行,它能夠過濾tuple、函數操作、合並(連接join、聚合aggregation)、數據庫讀寫等。Bolt可以做復雜的流傳輸,需要多步驟、多bolt的連接。 Bolt也可以發射出一個或多個流,它需要使用OutputFieldsDeclarer 類的 declareStream 方法聲明多個流,並且需要指定這個流去使用OutputCollectorl類的emit方法去發射。 當你聲明一個bolt的輸入流時,你需要訂閱一個指定的其他組件的流。每一個流的訂閱都是一個個添加。InputDeclarer類可以聲明一個流在默認的流id上。 declarer.shuffleGrouping("1") 說明在組件“1”上訂閱了這個默認流,等價於declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。 Bolts的主要方法是execute 方法,它會吸收作為輸入的一個新Tuple。Bolts使用 OutputCollector 對象發射新的Tuples。Bolts必須對每一個tuple調用OutputCollectorack 方法,以便於Storm知道什麽時候元組們被處理完成(可以最終確定它的安全對於包裝這個初始化spout tuples)。 共同處理一個輸入元組的情況下,發射0或多個元組們基於元組,然後包裝輸入元組,Storm提供一個IBasicBolt接口的自動包裝。 在Bolts異步處理的時候,完全可以啟動新線程;同時OutputCollector是線程安全的,可以在任何時候被調用。

Resources:

  • IRichBolt: this is general interface for bolts.
  • IBasicBolt: this is a convenience interface for defining bolts that do filtering or simple functions.
  • OutputCollector: bolts emit tuples to their output streams using an instance of this class
  • Guaranteeing message processing
Ps:bolt發送或接收的數據流都可以多對多的進行。 技術分享 5、Stream groupings 流分組 定義一個拓撲部分是指定了每個bolt門閂的流都應該作為輸入被接收。一個流分組定義為:在門閂的任務之中如何區分流。 在Storm中有8種流分組方式,通過實現CustomStreamGroupingj接口,你可以實現一種風格流分組方式:

Storm 定義了八種內置數據流分組的方式: 1、Shuffle grouping(隨機分組):這種方式會隨機分發 tuple 給bolt 的各個 task,每個bolt 實例接收到的相同數量的 tuple 。 2、Fields grouping(按字段分組):根據指定字段的值進行分組。比如說,一個數據流根據“ word”字段進行分組,所有具有相同“ word ”字段值的 tuple 會路由到同一個 bolt 的 task 中。 3、All grouping(全復制分組):將所有的 tuple 復制後分發給所有 bolt task。每個訂閱數據流的 task 都會接收到 tuple 的拷貝。 4、Globle grouping(全局分組):這種分組方式將所有的 tuples 路由到唯一一個 task 上。Storm 按照最小的 task ID 來選取接收數據的 task 。註意,當使用全局分組方式時,設置 bolt 的 task 並發度是沒有意義的(spout並發有意義),因為所有 tuple 都轉發到同一個 task 上了。使用全局分組的時候需要註意,因為所有的 tuple 都轉發到一個 JVM 實例上,可能會引起 Storm 集群中某個 JVM 或者服務器出現性能瓶頸或崩潰。 5、None grouping(不分組):在功能上和隨機分組相同,是為將來預留的。 6、Direct grouping(指向型分組):數據源會調用 emitDirect() 方法來判斷一個 tuple 應該由哪個 Storm 組件來接收。只能在聲明了是指向型的數據流上使用。 7、Local or shuffle grouping (本地或隨機分組):和隨機分組類似,但是,會將 tuple 分發給同一個 worker 內的bolt task (如果 worker 內有接收數據的 bolt task )。其他情況下,采用隨機分組的方式。取決於topology 的並發度,本地或隨機分組可以減少網絡傳輸,從而提高 topology 性能。 8、Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.

Resources:

  • TopologyBuilder: use this class to define topologies
  • InputDeclarer: this object is returned whenever setBolt is called onTopologyBuilder and is used for declaring a bolt‘s input streams and how those streams should be grouped
6、Reliability Storm保證每一個spout tuple都將會在拓撲中完整的被處理。處理過程:它會追蹤這個tuple tree被每一個spout tuple所觸發,並且確定tuple tree已經成功完成。每個拓撲都有一個“信息超時”與之相關聯。假如Storm未能檢測到一個spout tuple已經超時完成,它將舍棄並重新執行這個tuple。 為了改善Storm的可靠性能力,你可以告訴Storm什麽時候需要在元組樹種創建一個新的邊界,告訴Storm無論在什麽時候都可以完成處理一個獨立的tuple。Bolt們都使用了OutputCollector對象去發射tuple。“錨定”(實際上就是mark)的完成於這個emit方法,你可以聲明一個元組使用了ack方法而被完成。 以上詳細的解釋了可靠消息處理。 7、Tasks 每個噴口spout或者門閂bolt都有許多任務在集群中執行。每一個任務對應一個執行線程,流分組定義了如何從一個任務集到另外一個任務集發送元組。你可以使用TopologyBuilder 類的setSpout和setBolt方法,為每一個spout或bolt是設置並行度和並發度。 Ps:Tasks可以理解為每個節點上的任務實例,運行在對應executor線程上。 8、Workers 拓撲執行要通過一個或多個worker進程。每一個worker進程都是一個物理的JVM和這個拓撲中執行了一個所有這個任務的子集。 例子:如果拓撲的聯合並發數為300,分配了50個worker,因此每一個worker將會執行6個task(task將作為worker的線程)。Storm將會均勻的分配任務到所有worker上。

Resources:

  • Config.TOPOLOGY_WORKERS: this config sets the number of workers to allocate for executing the topology
Worker結構: 技術分享 技術分享 Topology的並發機制: storm的Worker、Executor、Task默認配置都是1 1、增加worker(本地模式無效,只有一個JVM) Config對象的setNumWorkers()方法: Config config = new Config(); config.setNumWorkers(2): 2、配置executor 和 task 默認都為1,setXXX指定一個Worker中有幾個線程,而後面的setNumXXX指定總共需要執行的tasks數量,因此,一個Thread--Executor中需要跑tasks/threads個任務。 topologyBuilder.setSpout(SENTENCE_SPOUT_ID, spout, 2); // StormBaseSpout -> StormBaseBolt topologyBuilder.setBolt(SPLIT_BOLT_ID, bolt).setNumTasks(2).shuffleGrouping(SENTENCE_SPOUT_ID); // StormBaseBolt -> StormBaseBoltSecond topologyBuilder.setBolt(COUNT_BOLT_ID, boltSecond, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); // StormBaseBoltSecond -> StormBaseBoltThird topologyBuilder.setBolt(REPORT_BOLT_ID, boltThird).globalGrouping(COUNT_BOLT_ID); 技術分享 storm的處理保障機制: 技術分享 1、spout的可靠性 spout會記錄它所發射出去的tuple,當下遊任意一個bolt處理失敗時spout能夠重新發射該tuple。在spout的nextTuple()發送一個tuple時,為實現可靠消息處理需要給每個spout發出的tuple帶上唯一ID,並將該ID作為參數傳遞給SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), tupleID); 實際上Values extends ArrayList<Object> 保障過程中,每個bolt每收到一個tuple,都要向上遊應答或報錯,在tuple樹上的所有bolt都確認應答,spout才會隱式調用ack()方法表明這條消息(一條完整的流)已經處理完畢,將會對編號ID的消息應答確認;處理報錯、超時則會調用fail()方法。 2、bolt的可靠性 bolt的可靠消息處理機制包含兩個步驟: a、當發射衍生的tuple,需要錨定讀入的tuple b、當處理消息時,需要應答或報錯 可以通過OutputCollector中emit()的一個重載函數錨定或tuple:collector.emit(tuple, new Values(word)); 並且需要調用一次this.collector.ack(tuple)應答。 以上就是storm的基礎概念,閱讀完後並不能滿足你去實現代碼的需求,因為需要一個可demo代碼,作為模仿的基礎。這裏就不做提供了,畢竟網上一大堆。 最近在研究Storm源代碼,不想與“源碼分析”一樣只告訴該類代碼:結構、方式、用到了什麽技術,而希望寫一些“特殊”的內容;當然有可能也不能免俗,但盡力寫點不同的東西。 內容有不妥的地方,希望大家指正,希望能一起進步,文筆欠佳,見諒。 此處配置的原理,會在接下來會講到worker和並發解釋。

Storm概念、原理詳解及其應用(一)BaseStorm