1. 程式人生 > >storm自帶例子詳解 (一)——WordCountTopologyNode

storm自帶例子詳解 (一)——WordCountTopologyNode

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.*;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * This topology demonstrates Storm's stream groupings and multilang capabilities.
 */
 
/*
** 一個使用了多語言機制的storm——WordCount
*/
public class WordCountTopologyNode {
	
	// 定義一個Bolt,繼承自ShellBolt,用ShellBolt來實現多語言機制(使用js實現了Bolt)
	public static class SplitSentence extends ShellBolt implements IRichBolt {

		// 建構函式
		public SplitSentence() {
			// 呼叫父類的建構函式——父類的建構函式將splitsentence.js作為Bolt
			super("node", "splitsentence.js");
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			// 定義個欄位word
			declarer.declare(new Fields("word"));
		}

		@Override
		public Map<String, Object> getComponentConfiguration() {
			return null;
		}
	}
	
	// 定義另外一個噴口——也是使用了多語言機制(使用js實現了Spout)
    public static class RandomSentence extends ShellSpout implements IRichSpout {

        public RandomSentence() {
			// 同樣的使用randomsentence.js作為實際的Spout
            super("node", "randomsentence.js");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
			// 定義一個欄位word
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

	// 定義一個Bolt
	public static class WordCount extends BaseBasicBolt {
		Map<String, Integer> counts = new HashMap<String, Integer>();

		@Override
		public void execute(Tuple tuple, BasicOutputCollector collector) {
			// 接收一個單詞
			String word = tuple.getString(0);
			// 取得單詞對應的計數
			Integer count = counts.get(word);
			if (count == null)
				count = 0;
			// 計數增加
			count++;
			// 儲存單詞和對應的計數
			counts.put(word, count);
			// 發射單詞和對應的計數(欄位分別是word和count)
			collector.emit(new Values(word, count));
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			// 定義兩個欄位word和count
			declarer.declare(new Fields("word", "count"));
		}
  }

  public static void main(String[] args) throws Exception {

	// 建立一個拓撲
    TopologyBuilder builder = new TopologyBuilder();
	
	// 設定Spout
    builder.setSpout("spout", new RandomSentence(), 5);
	// 設定Bolt——split
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
	// 設定Bolt——count
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

	// 設定為除錯狀態
    Config conf = new Config();
    conf.setDebug(true);


    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

	  // 提交拓撲(叢集)
      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {
	  // 提交拓撲(本地)
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();
    }
  }
}