1. 程式人生 > >流式計算--storm3(Storm單詞技術案例)

流式計算--storm3(Storm單詞技術案例)

    功能說明:設計一個topology,來實現對文件裡面的單詞出現的頻率進行統計。本篇部落格是在storm概念講解storm叢集搭建的基礎上來的

  1.建立一個maven專案:

        新增以來如下: 

 <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>1.0.6</version>
                <!--storm叢集裡面已經有jar包,所以新增下面的provided表示已提供jar包,所以打包的時候就要剔除相應的jar包,現在跑本地需要註釋掉-->
                <!--<scope>provided</scope>-->
            </dependency>

   建立一個MySpout類繼承BaseRichSpout類,用於接收外部資料來源的資料

package com.wx.storm1;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class MySpout extends BaseRichSpout {

    SpoutOutputCollector outputCollector;
    //初始化的方法,該方法呼叫一次,主要由Storm框架傳入SpoutOutputCollector
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
             this.outputCollector=spoutOutputCollector;
    }
    //storm 框架在 while(true) 呼叫nextTuple方法
    public void nextTuple() {
        outputCollector.emit(new Values("I love you,wang zu xian,my names Mr.Wang"));
    }
    //訊息源可以傳送多條訊息流stream,多條訊息流可以理解為多種型別的資料
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}
建立一個MyBolt繼承於BaseRichBolt,主要是分割單詞,得到的單詞計數輸出
package com.wx.storm1;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IWindowedBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class MyBolt extends BaseRichBolt{
    OutputCollector outputCollector;
    //初始化方法
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector=outputCollector;
    }
    //被storm框架 while(true) 迴圈呼叫  傳入引數tuple
    public void execute(Tuple tuple) {
        //這裡將單詞分割,得到的單詞計數輸出
        String string = tuple.getString(0);
        String[] split = string.split(" ");
        for (String val:split)
        {
            outputCollector.emit(new Values(val,1));
        }
    }
    //輸出到下一個bolt
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word","num"));
    }
}

  建立一個MyBoltCount繼承於BaseRichBolt ,主要是對相同的單詞做統計

package com.wx.storm1;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IWindowedBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class MyBoltCount extends BaseRichBolt {
    OutputCollector outputCollector;
    Map<String,Integer> map=new HashMap<String, Integer>();
    //初始化方法
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector=outputCollector;
    }
    //被storm框架 while(true) 迴圈呼叫  傳入引數tuple
    public void execute(Tuple tuple) {
          //這裡就將相同的單詞統計出來,然後輸出
        String word = tuple.getString(0);
        Integer num = tuple.getInteger(1);
        if(!map.containsKey(word))
        {
            map.put(word,num);
        }
        else
        {
            Integer count=map.get(word);
            map.put(word,num+count);
        }
        System.out.println("count:"+map);
    }

    //這個bolt就是盡頭了,不輸出
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

  建立一個拓撲來分配任務:
  

package com.wx.storm1;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountStrom {
     public static  void  main(String[] args) throws  Exception
     {
         //Storm計算wordcount
         //1.建立一個TopologyBuilder
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         //指定Spout的類和需要task的數量,task的數量即執行緒的數量
         topologyBuilder.setSpout("myspout",new MySpout(),2);
         //指定需要的第一級bolt的類和task的數量
         topologyBuilder.setBolt("mybolt1",new MyBolt(),2).shuffleGrouping("myspout"); //隨機函式分組
         topologyBuilder.setBolt("mybolt2",new MyBoltCount(),4).fieldsGrouping("mybolt1",new Fields("word"));//hash取模分組

         //2.建立一個configuration,用來指定當前topology 需要的worker的數量
         Config config=new Config();
         config.setNumWorkers(3);

         //3.提交任務  -----兩種模式 本地模式和叢集模式
         //StormSubmitter.submitTopology("wordcount",config,topologyBuilder.createTopology());
         LocalCluster localCluster = new LocalCluster();
         localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());

     }
}

   測試結果:
 

如果要提交到叢集執行:
 首先需要修改pom檔案,表示storm叢集中已經提供jar包

   

其次提交拓撲的任務修改為叢集模式

 

在idea中進行打包:雙擊package打包成功:

打的jar包在target目錄下:ok ,上傳至叢集,準備執行

執行執行

 

叢集的拓撲途中就有了:

使用命令把他kill 掉:storm kill wordcount  

  

可以檢視日誌:cd /export/servers/storm/logs/

  

2.Storm常用操作命令

有許多簡單且有用的命令可以用來管理拓撲,它們可以提交、殺死、禁用、再平衡拓撲。

1.提交任務命令格式:storm jar 【jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】

bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

2.殺死任務命令格式:storm kill 【拓撲名稱】 -w 10(執行kill命令時可以通過-w [等待秒數]指定拓撲停用以後的等待時間)

storm kill topology-name -w 10

3.停用任務命令格式:storm deactivte  【拓撲名稱】

storm deactivte topology-name

我們能夠掛起或停用執行中的拓撲。當停用拓撲時,所有已分發的元組都會得到處理,但是spouts的nextTuple方法不會被呼叫。銷燬一個拓撲,可以使用kill命令。它會以一種安全的方式銷燬一個拓撲,首先停用拓撲,在等待拓撲訊息的時間段內允許拓撲完成當前的資料流。

4.啟用任務命令格式:storm activate【拓撲名稱】

        storm activate topology-name

5.重新部署任務命令格式:storm rebalance  【拓撲名稱】

        storm rebalance topology-name

        再平衡使你重分配叢集任務。這是個很強大的命令。比如,你向一個執行中的叢集增加了節點。再平衡命令將會停用拓撲,然後在相應超時時間之後重分配工人,並重啟拓撲。

 3.執行原理圖(Mr.Mao的圖)

     這裡我們stream grouping的分組方式是按欄位進行分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts裡的一個task,而不同的userid則會被分配到不同的bolts裡的task。

  

4.Stream Grouping詳解

Storm裡面有7種類型的stream grouping

  1. Shuffle Grouping: 隨機分組, 隨機派發stream裡面的tuple,保證每個bolt接收到的tuple數目大致相同。
  2. Fields Grouping:按欄位分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts裡的一個task,而不同的userid則會被分配到不同的bolts裡的task。
  3. All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
  4. Global Grouping:全域性分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
  5. Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個執行緒裡面去執行。
  6. Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味著訊息的傳送者指定由訊息接收者的哪個task處理這個訊息。只有被宣告為Direct Stream的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用emitDirect方法來發射。訊息處理者可以通過TopologyContext來獲取處理它的訊息的task的id (OutputCollector.emit方法也會返回task的id)。
  7. Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作程序中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。

  5.問題(Mr.Mao的圖)

      1.叢集如何啟動,任務如何執行?

         

     2、叢集如何通訊:叢集架構中的各個模組是如何通訊的?拓撲程式中的各個Task是如何通訊的?

         nimbus與supervisor之間是通過zk進行通訊的

         不同機器上的worker是基於網路io的socket通訊(netty)

          worker內部是基於縣城的通訊,執行緒於執行緒通訊

   3.如何保證訊息的不丟失   ack-fail機制如何實現的?

    

  4.storm 啟動流程分析

  

流式計算一般架構圖

下面會學習kafka....