1. 程式人生 > >Storm 使用經驗與效能優化(一)

Storm 使用經驗與效能優化(一)

提交任務:storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.WordCountTopology word-count
查詢任務:storm list
Kill任務:storm kill word-count

1) 使用rebalance命令動態調整併發度

     Storm提供了rebalance命令,可以動態調整Topology的Worker數量和Component的併發度,而不用修改Topology的程式碼。如果想動態增加某個Component的併發度,需要設定Component的NumTask數量或者MaxTaskParallelism的引數值,並且大於併發度引數值(parallelism_hint)的值。如果不設定NumTask數量,預設值等同於併發度,重新平衡的時候就只能減少併發度,而不能再增加了。

	SentenceSpout spout=new SentenceSpout();
	SplitSentenceBolt splitbolt=new SplitSentenceBolt();
	WordCountBolt countbolt=new WordCountBolt();
	ReportBolt reportbolt=new ReportBolt();
		
	TopologyBuilder builder=new TopologyBuilder();
	// 設定併發為2個executor,每個Task指派各自的executor執行緒
	builder.setSpout(SENTENCE_SPOUT_ID,spout,5).setMaxTaskParallelism(10);
	// 設定併發為2個executor,每個executor執行2個task
	builder.setBolt(SPLIT_BOLT_ID,splitbolt,8).shuffleGrouping(SENTENCE_SPOUT_ID).setNumTasks(16);
	// 有時候我們需要將特定資料的tuple路由到特殊的bolt例項中,在此我們使用fieldsGrouping
	// 來保證所有"word"欄位值相同的tuple會被路由到同一個WordCountBolt例項中
	builder.setBolt(COUNT_BOLT_ID,countbolt,12).fieldsGrouping(SPLIT_BOLT_ID,new Fields("words"));
	builder.setBolt(REPORT_BOLT_ID,reportbolt).globalGrouping(COUNT_BOLT_ID);
		
	/*Map conf=new HashMap();
	conf.put(Config.TOPOLOGY_WORKERS,4);
	conf.put(Config.TOPOLOGY_DEBUG,true);*/
		
	Config conf = new Config();
	conf.setDebug(true);
	conf.setNumWorkers(3);
	LocalCluster cluster=new LocalCluster();
	cluster.submitTopology(TOPOLOGY_NAME,conf,builder.createTopology());
		
//	Thread.sleep(1000);
//	cluster.shutdown();
		

   在構建Topology的過程中,指定了Spout的初始併發度為5,也就是會啟動5個Executor執行緒來執行,然後設定最大併發度為10,上限為10個Executor執行緒;同時也指定了Split初始併發度為8,最大的Task數量為16個。在執行過程中,我們發現Spout或者Split的效能是瓶頸,可以通過調整併發度來提供效能。Spout併發度上限為10個,Split的最大併發度為16個。

   storm rebalance word-count -w 10 -n 4 -e spout=8 -e split=12

  Storm通過自己內部的重新部署和分配,來完成併發度的調整。等待10s後,Topology進入重新分配狀態,等重新分配完成後,就可以在Storm UI上看見新的結果了。