1. 程式人生 > >Storm學習記錄(一、簡介)

Storm學習記錄(一、簡介)

一、簡介

Storm是一個免費並開源的分散式實時計算系統。利用Storm可以很容易做到可靠地處理無限的資料流,像Hadoop批量處理大資料一樣,Storm可以實時處理資料。Storm簡單,可以使用任何程式語言。

Storm有如下特點:

  1. 程式設計簡單:開發人員只需要關注應用邏輯,而且跟Hadoop類似,Storm提供的程式設計原語也很簡單
  2. 高效能,低延遲:可以應用於廣告搜尋引擎這種要求對廣告主的操作進行實時響應的場景。
  3. 分散式:可以輕鬆應對資料量大,單機搞不定的場景
  4. 可擴充套件: 隨著業務發展,資料量和計算量越來越大,系統可水平擴充套件
  5. 容錯:單個節點掛了不影響應用
  6. 訊息不丟失:保證訊息處理

Storm計算模型:

Topology – DAG有向無環圖的實現:

對於Storm實時計算邏輯的封裝即,由一系列通過資料流相互關聯的SpoutBolt所組成的拓撲結構

生命週期:此拓撲只要啟動就會一直在叢集中執行,直到手動將其kill,否則不會終止

(區別於MapReduce當中的JobMR當中的Job在計算執行完成就會終止)

Tuple – 元組:Stream中最小資料組成單元

Stream – 資料流

Spout中源源不斷傳遞資料給Bolt、以及上一個Bolt傳遞資料給下一個Bolt,所形成的這些資料通道即叫做

Stream

Stream宣告時需給其指定一個Id(預設為Default),實際開發場景中,多使用單一資料流,此時不需要單獨指定StreamId

二、樣例 

求sum= 1+2+3+....

新增依賴:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>
public class Test {
    /**
     * 建立拓撲結構,放入叢集執行
     * @param args
     */
    public static void main(String[] args) {
        //構建拓撲結構
        TopologyBuilder tb = new TopologyBuilder();

        tb.setSpout("wsspout",new WordSumSpout());

        tb.setBolt("wsbolt",new WordSumBolt()).shuffleGrouping("wsspout");

//        建立本地叢集
        LocalCluster lc = new LocalCluster();
//        將任務佈置到叢集上
        lc.submitTopology("wordsum",new Config(),tb.createTopology());
    }
}
public class WordSumBolt extends BaseRichBolt {
    Map map;
    TopologyContext context;
    OutputCollector collector;

    int sum = 0;

    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
        this.map = map;
        this.collector = collector;
        this.context = context;
    }

    /**
     * 獲取資料(有必要的話,向後繼續傳送資料)
     */
    @Override
    public void execute(Tuple tuple) {
//        tuple.getInteger(0);
        int num = tuple.getIntegerByField("num");
        sum += num;

        System.out.println("sum: ------" + sum);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}
public class WordSumSpout extends BaseRichSpout {
    Map map;
    TopologyContext context;
    SpoutOutputCollector collector;
    int i =0;

    /**
     * 配置初始化spout類
     */
    @Override
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
        this.map = map;
        this.context = context;
        this.collector = collector;
    }

    /**
     * 採集並向後推送資料
     */
    @Override
    public void nextTuple() {
        i++;
        List num = new Values(i);
        this.collector.emit(num);

        System.err.println("Spout:-------- "+i);
        Utils.sleep(1000);
    }

    /**
     * 向接收資料的邏輯單元傳送資料的欄位名稱
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("num"));
    }
}

統計單詞出現個數 :

public class Test {
    public static void main(String[] args) {
        TopologyBuilder tb = new TopologyBuilder();
        tb.setSpout("wcspout",new WcSpout());
        tb.setBolt("wspiltbolt",new WspiltBolt()).shuffleGrouping("wcspout");
//        fieldsGrouping: 只傳到同一個bolt處理
        tb.setBolt("wcountbolt",new WcountBolt(),3).fieldsGrouping("wspiltbolt",new Fields("word"));

        LocalCluster lc =new LocalCluster();
        lc.submitTopology("wordcount",new Config(),tb.createTopology());

    }
}
public class WcountBolt extends BaseRichBolt {

    //用來統計單詞及次數
    Map<String, Integer> map = new HashMap<>();

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
    }

    /**
     * 獲取tuple中的每個單詞,並按照單詞統計出現的次數
     */
    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");

        if (map.containsKey(word)) {
            map.put(word, map.get(word) + 1);
        } else {
            map.put(word, 1);
        }

        System.out.println(word +"--------"+map.get(word));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}
public class WcSpout extends BaseRichSpout {

    SpoutOutputCollector collector;
    //模擬資料
    String[] text = {
            "hello Sam", "hello Tom", "hello Jetty"
    };
    Random r = new Random();

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    //    隨機向後傳送字串
    @Override
    public void nextTuple() {
        List line = new Values(text[r.nextInt(text.length)]);
        this.collector.emit(line);
        System.out.println("spout emit: -------" + line);
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}
public class WspiltBolt extends BaseRichBolt{

    OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * 獲取每一行並切割
     */
    @Override
    public void execute(Tuple tuple) {
        String line = tuple.getString(0);
        String[] words = line.split(" ");

        for (String word:words){
            this.collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}