1. 程式人生 > >Storm實時大資料處理(三)

Storm實時大資料處理(三)

本文主題:構建和執行Storm拓撲(Build and Run the Storm Topology)

一、構建Storm拓撲

實現了Spout和Bolt後,就可以構建Storm拓撲了,使用TopologyBuilder構建Topology。

TopologyBuilder builder = new TopologyBuilder();

之後就可以使用builder物件構建拓撲了,使用setSpout方法配置Spout。

public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException;
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException;
setSpout方法有2個過載的版本(基於IRichSpout實現Spout時),第一個傳入2個引數:Spout的ID和IRichSpout例項,即自己實現的Spout例項;第二個傳入3個引數:Spout的ID、IRichSpout例項和Spout的並行數。

setSpout方法返回一個SpoutDeclarer物件,可以使用該物件進一步對Spout進行配置,具體請參考SpoutDeclarer API。

繼續使用builder物件的setBolt方法配置Bolt。

public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException;
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException;
setBolt方法也有2個個過載的版本(基於IRichBolt實現Bolt時),第一個傳入2個引數:Bolt的ID和IRichBolt例項,即自己實現的Bolt例項;第二個傳入3個引數:Bolt的ID、IRichBolt例項和Bolt的並行數。

setBolt方法返回一個BoltDeclarer物件,可以使用該物件進一步對Bolt進行配置,最常見的就是配置Tuple的分流(grouping)方式。

二、執行Storm拓撲

Storm拓撲有2種執行模式,叢集模式和本地模式。

叢集模式執行Storm拓撲,首先使用StormSubmitter類的靜態方法submitTopology來提交拓撲,方法宣告如下:

public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException;
第一個引數是拓撲的名稱,第二個是拓撲更詳細的一些配置資訊,第三個StormTopology使用前面的builder物件來建立,舉例:
Config config = new Config();
StormSubmitter.submitTopology("wordCount",config ,builder.createTopology());
然後將專案打包為jar,到Nimbus機器上執行Python指令碼storm提交到叢集執行。
Storm jar yourJar.jar MainClass args...
本地模式執行Storm拓撲,使用LocalCluster,舉例:
LocalCluster cluster = new LocalCluster();
Config config = new Config();
cluster.submitTopology(topologyID,stormConf,builder.createTopology());
之後,像普通Java Application一樣執行即可。