理解 Storm 拓撲的並行度
1. 什麼讓拓撲執行
Storm 區分以下 3 個主要的實體,用來執行 Storm 叢集中拓撲:
-
Worker 程序
-
Executors 執行緒
-
Tasks
這是一個簡單的例子, 以說明他們之間的關係
一個 Worker 程序用來執行一個拓撲的子集。屬於一個指定拓撲的 Worker 程序, 為該拓撲的一個或多個元件(spouts 或 bolts)執行一個或多個 Executors。一個正在執行的拓撲由多個這樣的程序組成, 它們執行在 Storm 叢集的多個機器上。
Executor 是一個執行緒,由 Worker 程序產生。一個 Executor 可以為同一個元件(spout 或 blot)執行一個或多個 Tasks。
Task 執行實際的資料處理 - 在你程式碼中實現的 spout 或 bolt 在叢集上執行儘可能多的 Task。一個元件的 Task 數目在整個拓撲生命週期中總是相同的,但是一個元件的 Executors 數目會隨時間變化。這意味著以下條件成立: #threads ≤ #tasks
。預設情況下,Tasks 的數目與 Executors 的數目設定成一樣,即,Storm 在每個執行緒上執行一個 Task。
2. 配置拓撲的並行度
請注意,在 Storm 的術語中, parallelism
專門用來描述所謂的 parallelism hint
,表示一個元件的 Executor 的初始化數量。在這篇文章中, 儘管我們一般使用 parallelism
術語來描述如何配置 Executor 的數目,但同時也可以配置 Worker 程序的數目和 Storm 拓撲的 Tasks 數目。
以下部分概述了各種配置引數以及如何在程式碼中進行設定。儘管可以有多種方法去設定這些引數,但下面只列出了其中的一些。Storm 目前配置的優先順序為: defaults.yaml < storm.yaml < 特定拓撲的配置 < 特定內部元件的配置 < 特定外部元件的配置
。
2.1 Worker 程序的數量
-
描述: 在叢集的機器上為拓撲建立多少個 Worker 程序。
-
配置引數:
TOPOLOGY_WORKERS
。 -
如何在程式碼中設定:
conf.setNumWorkers(4)
。
2.2 Executors的數量
-
描述: 為每個元件建立多少個 Executors。
-
配置引數: None (傳遞
parallelism_hint
引數到 setSpout 或 setBolt)。 -
如何在程式碼中設定:
TopologyBuilder#setSpout()
或TopologyBuilder#setBolt()
。
引數現在指定了 Bolt 的 Executors 的初始化數量(不是 Tasks)。
2.3 Tasks的數量
-
描述: 為每個元件建立多少個 Tasks。
-
配置引數:
TOPOLOGY_TASKS
。 -
如何在程式碼中設定:
ComponentConfigurationDeclarer#setNumTasks()
3. 執行拓撲示例
Config conf = new Config();// use two worker processesconf.setNumWorkers(2);// set parallelism hint to 2topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping("blue-spout"); topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6) .shuffleGrouping("green-bolt");StormSubmitter.submitTopology("mytopology", conf, topologyBuilder.createTopology() );
下圖顯示了上述簡單拓撲是如何執行的。該拓撲由 3 個元件構成: 一個為 BlueSpout
的 spout 和兩個為 GreenBolt
和 YellowBolt
的 bolts。 BlueSpout
將其輸出傳送到 GreenBolt
, GreenBolt
將其輸出傳送到 YellowBolt
。
Storm 還提供了額外的配置來設定拓撲的並行度:
-
TOPOLOGY_MAX_TASK_PARALLELISM
: 此引數設定單個元件 Executor 數量的上限。通常在測試期間使用它來限制在本地模式下執行拓撲時產生的執行緒數。你可以通過Config#setMaxTaskParallelism()
來設定此選項。
4. 如何改變正在執行中的拓撲的並行度
Storm 的一個很好的特性是可以增加或減少 Worker 程序 或 Executor 的數量,不需要重新啟動叢集拓撲。這樣的行為稱之為 rebalance
。
你有兩個選項來 rebalance 一個拓撲:
-
使用
Storm web UI
來 rebalance 指定的拓撲。 -
使用 CLI 工具
storm rebalance
, 如下所示。 以下是一個使用 CLI 工具的示例:
## 重新配置拓撲 "mytopology" 使用5 個 Worker 程序 ## 重新配置spout "blue-spout" 使用 3 個 Executors ## 重新配置bolt "yellow-bolt" 使用 10 個 Executors $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
版本:1.2.2