Storm概念、原理詳解及其應用(一)BaseStorm
阿新 • • 發佈:2017-06-16
when 結構 tails 並發數 vm 虛擬機 cif 異步 優勢 name
第二問題:
很多場景下,我們希望系統能夠實時的處理一條數據、甚至是事務。也就是說,在處理數據、事務的過程中,到達系統,並能馬上得到結果。其次,在成萬上億條數據大量湧入系統時,也要求“實時”的到事務處理的結果。此時,單個節點已經是杯水車薪了,而Storm的關鍵一項是因為它支持分布式並行計算!如果說,你遇到了以上相似的場景,那Storm可以當仁不讓的扛起實時處理的大旗!
第三個問題:
這個問題其實很難界定,因為Spark在RDD粒度上,可以滿足實時計算的要求,當然,使用RDD還有其他優勢;但總的來說,Storm 的實時性更強。其次,Storm的框架完全按照流式處理的思想構建,和項目場景結合性更強一些。(Spark 用的不是很多,歡迎吐槽。)
進入正題,
在看Storm之前,很多人都對Hadoop有一定了解,為了能更快入戲,我們以Hadoop為參照,以下是它使用yarn之前的架構,對照Storm Server框架理解。
Hadoop、Storm系統和組件接口對比表:
Storm框架:
上面這幅圖是Stom框架圖,和很多分布式系統一樣,基於zk作為集群配置運行的元數據基礎平臺。
nimbus和supervisor是服務器端守護進程,守護進程的文章會在Storm概念、原理詳解及其應用(二)Storm Cluster。
以下是對啟動一個應用所需要的集群上JVM進程線程的簡單介紹,建議記憶後再繼續閱讀。
· Nodes (服務器):指配置在一個 Storm 集群中的服務器,會執行 topology 的一部分
運算。一個 Storm 集群可以包括一個或者多個工作 node 。
· Workers (JVM 虛擬機):指一個 node 上相互獨立運行的 JVM 進程。每個 node 可
以配置運行一個或者多個 worker 。一個topology 會分配到一個或者多個 worker 上
運行。
· Executor (線程):指一個 worker 的jvm 進程中運行的 Java 線程。多個 task 可以
指派給同一個 executer 來執行。除非是明確指定, Storm 默認會給每個 executor 分
配一個 task。
· Task (bolt/spout 實例): task 是 spout 和bolt 的實例, 它們的 nextTuple() 和
execute() 方法會被executors 線程調用執行。
例如:
本文借鑒官文,添加了一些解釋和看法,其中有些理解,寫的比較粗糙,有問題的地方希望大家指出。寫這篇文章,是想把一些官文和資料中基礎、重點拿出來,能總結出便於大家理解的話語。與大多數“wordcount”代碼不同的是,並不會有如何運行第一storm代碼等內容,只有在運行完代碼後,發現需要明白:“知其然,並知其所以然”。
Storm是什麽?為什麽要用Storm?為什麽不用Spark? 第一個問題,以下概念足以解釋: Storm是基於數據流的實時處理系統,提供了大吞吐量的實時計算能力。通過數據入口獲取每條到來的數據,在一條數據到達系統的時候,立即會在內存中進行相應的計算;Storm適合要求實時性較高的數據分析場景。builder.setSpout(spoutName, spout, spoutParallelism).setNumTasks(2)
這裏可以定義spoutParallelism = 2,即對應兩個executor線程,tasks為兩個實例。
(此處配置的原理,會在接下來會講到worker和並發中解釋。)
可以看出,雖然在這設置了多個task實例,但是並行度並沒有提高(而executor在不同的worker上執行,存在並行),因為只有兩個線程去運行這些實例,只有設置足夠多的線程和實例才可以真正的提高並行度;在這設置多個實例主要是為了下面執行rebalance的時候用到。
為什麽要用rebalance?
這裏一直在啟動、操作的是“線程”,真正的process需要在配置中設置worker數量,也就是說topology啟動時已經決定了worker數量(即並行數量)。因為rebalance不需要修改代碼,就可以動態修改topology的並行度,這樣的話就必須提前配置好多個實例,在rebalance的時候主要是對之前設置多余的任務實例分配線程去執行。
在命令行動態修改並行度:
除了使用代碼進行調整,還可以在shell命令行下對並行度進行調整。
storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2
表示 10秒之後對mytopology進行並行度調整。把spout調整為2個executor,把bolt調整為2個executor
註意:並行度主要就是調整executor的數量,但是調整之後的executor的數量必須小於等於task的數量,如果分配的executor的線程數比task數量多的話也只能分配和task數量相等的executor。
概念:
官方對於Storm下名詞概念的解釋如下:
1、Topologies
2、Streams
3、Spouts
4、Bolts
5、Stream groupings
6、Reliability
7、Tasks
8、Workers
1、Topologies(拓撲)
Topology是Storm中實時應用的一種封裝。其功能 analogous to a MapReducejob,但唯一不同的是它是循環執行的——無數據流等待,有數據流執行,直到被kill progress。
一個Topology是spouts和bolts組成並被Stream groupings連接的一副流程圖,相關概念如下:
Resources:
- TopologyBuilder: use this class to construct topologies in Java:在java中,該類構建了topologies。
- Running topologies on a production cluster:在生產集群中,運行多個topologies。
- Local mode: Read this to learn how to develop and test topologies in local mode. 在本地模型中開發和測試topologies。
- Tuple: streams are composed of tuples:Tuple是一個interface,對應的實現類 TupleImpl。
- OutputFieldsDeclarer: used to declare streams and their schemas
- Serialization: Information about Storm‘s dynamic typing of tuples and declaring custom serializations
declareStream
方法聲明多個流,並且當使用SpoutOutputCollector(實現2,接口模式)的emit方法可以指定這個流去發送Tuple。
Spouts的主要方法之一是:nextTuple() 發送tuple,nextTuple可以發送一個新的Tuple到Topology,或者當沒有新的Tuple被發送的時候,就簡單的返回。對於任何spout的實現,nextTuple都不能阻塞,因為Storm調用的所有spout都是基於同一個線程!
其次是 ack 和 fail 方法,它們都會被調用,當Storm發現一個tuple被從spout發射後,要麽成功地完成的通過topology,要麽錯誤的完成。ack 和 fail 方法只有在可靠的spouts下才能被調用。spout可靠性,請搜本頁下面內容,或移至代碼。
Resources:
- IRichSpout: this is the interface that spouts must implement.
- Guaranteeing message processing
declareStream 方法
聲明多個流,並且需要指定這個流去使用OutputCollectorl類的emit方法去
發射。
當你聲明一個bolt的輸入流時,你需要訂閱一個指定的其他組件的流。每一個流的訂閱都是一個個添加。InputDeclarer類可以聲明一個流在默認的流id上。 declarer.shuffleGrouping("1") 說明在組件“1”上訂閱了這個默認流,等價於
declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。
Bolts的主要方法是execute
方法,它會吸收作為輸入的一個新Tuple。Bolts使用 OutputCollector 對象發射新的Tuples。Bolts必須對每一個tuple調用OutputCollector
的ack
方法,以便於Storm知道什麽時候元組們被處理完成(可以最終確定它的安全對於包裝這個初始化spout tuples)。 共同處理一個輸入元組的情況下,發射0或多個元組們基於元組,然後包裝輸入元組,Storm提供一個IBasicBolt接口的自動包裝。
在Bolts異步處理的時候,完全可以啟動新線程;同時OutputCollector是線程安全的,可以在任何時候被調用。
Resources:
- IRichBolt: this is general interface for bolts.
- IBasicBolt: this is a convenience interface for defining bolts that do filtering or simple functions.
- OutputCollector: bolts emit tuples to their output streams using an instance of this class
- Guaranteeing message processing
Storm 定義了八種內置數據流分組的方式: 1、Shuffle grouping(隨機分組):這種方式會隨機分發 tuple 給bolt 的各個 task,每個bolt 實例接收到的相同數量的 tuple 。 2、Fields grouping(按字段分組):根據指定字段的值進行分組。比如說,一個數據流根據“ word”字段進行分組,所有具有相同“ word ”字段值的 tuple 會路由到同一個 bolt 的 task 中。 3、All grouping(全復制分組):將所有的 tuple 復制後分發給所有 bolt task。每個訂閱數據流的 task 都會接收到 tuple 的拷貝。 4、Globle grouping(全局分組):這種分組方式將所有的 tuples 路由到唯一一個 task 上。Storm 按照最小的 task ID 來選取接收數據的 task 。註意,當使用全局分組方式時,設置 bolt 的 task 並發度是沒有意義的(spout並發有意義),因為所有 tuple 都轉發到同一個 task 上了。使用全局分組的時候需要註意,因為所有的 tuple 都轉發到一個 JVM 實例上,可能會引起 Storm 集群中某個 JVM 或者服務器出現性能瓶頸或崩潰。 5、None grouping(不分組):在功能上和隨機分組相同,是為將來預留的。 6、Direct grouping(指向型分組):數據源會調用 emitDirect() 方法來判斷一個 tuple 應該由哪個 Storm 組件來接收。只能在聲明了是指向型的數據流上使用。 7、Local or shuffle grouping (本地或隨機分組):和隨機分組類似,但是,會將 tuple 分發給同一個 worker 內的bolt task (如果 worker 內有接收數據的 bolt task )。其他情況下,采用隨機分組的方式。取決於topology 的並發度,本地或隨機分組可以減少網絡傳輸,從而提高 topology 性能。 8、Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.
Resources:
- TopologyBuilder: use this class to define topologies
- InputDeclarer: this object is returned whenever
setBolt
is called onTopologyBuilder
and is used for declaring a bolt‘s input streams and how those streams should be grouped
Resources:
- Config.TOPOLOGY_WORKERS: this config sets the number of workers to allocate for executing the topology
Storm概念、原理詳解及其應用(一)BaseStorm