1. 程式人生 > >大資料之storm(一) --- storm簡介,核心元件,工作流程,安裝和部署,電話通訊案例分析,叢集執行,單詞統計案例分析,調整併發度

大資料之storm(一) --- storm簡介,核心元件,工作流程,安裝和部署,電話通訊案例分析,叢集執行,單詞統計案例分析,調整併發度

一、storm簡介
---------------------------------------------------------
    1.開源,分散式,實時計算

    2.實時可靠的處理無限資料流,可以使用任何語言開發

    3.適用於實時分析,線上機器學習,分散式PRC,ETL

    4.每秒可以處理上百萬條記錄(元組)

    5.可拓展,容錯,並可保證資料至少處理一次

    6.低延遲,秒級,分鐘級計算響應


二、核心概念
------------------------------------------------------------
    1.toplogy:
        a.封裝 實時計算 的物件,類似於MR,並且不會終止
        b.spout和bolt 連線在一起形成一個top, 形成有向圖[定點 -- 邊] 定點代表計算核心,邊代表資料流

    2.nimbus:
        a.資源排程和分配 ,類似於jobTracker
        b.master node, 是核心元件,領導,管理指派task
        c.分析top,收集並執行task.分發task給supervisor
        d.使用內部的訊息系統,與 supervisor進行通訊
        e.監控top是否失敗,無狀態,必須依靠zk來監控top的執行狀態

    3.supervisor:
        a.接收nimbus的指令, 啟動和管轄所有的worker ,類似於 TaskTracker,
        b.worker node , 監理 ,分配Task給worker
        c.每個supervisor 有n個worker程序,supervisor管理旗下所有的worker 程序

    4.Worker :本身不執行任務,只是孵化出 具體處理計算 的程序[Executors],讓executors程序執行tasks

    5.Executor: Worker孵化出的一個物理執行緒,內部執行的task必須屬於同一作業[spout/bolt]

    6.Task: storm 中的最小工作單元,類似於MR,執行實際的任務處理工作,或者是spout或者是bolt

    7.Spout: 水龍頭,獲取資料來源資料,通過nextTuple函式傳送到bolt。資料流的源頭,可以自定義,可以kafka

    8.bolt: 轉接頭,邏輯處理單元,接受spout傳送的資料,進行過濾,合併,寫入db等.

    9.Tuple:元組,訊息的基本單位,主要的資料結構,有序元素的列表

    10.Stream : 源源不斷的tuple構成了流,一系列元組

    11.stream group: 訊息組/分割槽 -- 分組方式有shuffle,fields,all,global,none,direct,localshuffle等


三、storm和hadoop的對比
-----------------------------------------------------------
    storm                               hadoop
    實時流處理                           批處理
    無狀態                               有狀態
    使用zk協同的主從架構                  無zk主從架構
    每秒處理百萬記錄                      MR作業 數分鐘數小時
    不會主動停                            會停
    toplogy -- task                     mapreduce -- mrtask
    nimbus - Supervisor                 jobTracker -- TaskTracker
    spout - bolt                        map -- reduce


四、storm的工作流程
-----------------------------------------------------------
    1.nimbus等待toplogy的提交

    2.提交toplogy

    3.nimbus收到top,從中提取tasks

    4.nimbus分發tasks給所有可用的supervisors

    5.all supervisors 會週期性的傳送心跳資訊給nimbus證明其依然存活,如果supervosor掛掉,nimbus就不再給其傳送tasks,取而代之的是傳送給其他的supervisor

    6.當nimbus掛掉,已經分配給supervisors的tasks會繼續執行,而不受其影響。

    7.tasks完成之後,supervisor會等待新的tasks

    8.掛掉的nimbus會通過監控工具自動重啟,從掛掉的地方繼續工作


五、安裝和部署storm
-------------------------------------------------------------
    1.找4臺機器,1 nimbus , 3 supervisors

    2.下載安裝包apache-storm-1.1.0.tar.gz,tar開,建立符號連結,分發

    3.配置環境變數,分發
         [/etc/environment]
         ...
         STORM_HOME="/soft/storm"
         $PATH="... :/soft/storm/bin"

    4.驗證安裝
        $> storm version

    5.部署storm
        a.配置配置檔案並分發 [/soft/storm/conf/storm.yaml]
            storm.local.dir: "/home/ubuntu/storm"
            storm.zookeeper.servers:
                - "s200"
                - "s300"

            storm.zookeeper.port: 2181

            ### nimbus.* configs are for the master
            nimbus.seeds : ["s201"]

            ### ui.* configs are for the master
            ui.host: 0.0.0.0
            ui.port: 8080

            supervisor.slots.ports:
                - 6700
                - 6701
                - 6702
                - 6703

    6.啟動storm
        a.在s100上啟動nimbus節點
            $> storm nimbus

        b.在s200,s300,s400上啟動 supervisor
            $> storm supervisor

        c.在s100上啟動UI程序
            $> cd /soft/storm/bin
            $bin> ./storm ui &

        d.檢視webui
            http://s100:8080


六、電話通訊案例分析
--------------------------------------------------------------
    1.spout類
    ----------------------------------------------------------
    
package test.storm;
    import org.apache.storm.spout.ISpout;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;

    /**
     * Storm Spout類
     * 負責產生資料流的
     */
    public class CallLogSpout implements IRichSpout {

        //spout輸出的收集器:傳遞tuples to bolt
        private SpoutOutputCollector collector;
        //是否完成
        private boolean completed = false;
        //top資料的封裝
        private TopologyContext context;
        //隨機發送器
        private Random randomGenerator = new Random();
        //索引
        private Integer idx = 0;

        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.context = topologyContext;
            this.collector = spoutOutputCollector;
        }

        public void close() {

        }

        public void activate() {

        }

        public void deactivate() {

        }

        /**
         * 下一個元組,記錄
         */
        public void nextTuple() {

            if(this.idx <= 1000) {

                List<String> mobileNumbers = new ArrayList<String>();
                mobileNumbers.add("1234123401");
                mobileNumbers.add("1234123402");
                mobileNumbers.add("1234123403");
                mobileNumbers.add("1234123404");
                Integer localIdx = 0;
                while(localIdx++ < 100 && this.idx++ < 1000) {
                    //隨機主叫
                    String caller = mobileNumbers.get(randomGenerator.nextInt(4));
                    //隨機被叫
                    String callered = mobileNumbers.get(randomGenerator.nextInt(4));
                    while(caller == callered) {
                        callered = mobileNumbers.get(randomGenerator.nextInt(4));
                    }
                    //隨機通話時長
                    Integer duration = randomGenerator.nextInt(60);
                    //收集器傳送元組資訊給bolt
                    this.collector.emit(new Values(caller, callered, duration));
                }
            }

        }

        public void ack(Object o) {

        }

        public void fail(Object o) {

        }

        /**
         * 定義輸出欄位的名稱
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("caller", "callered", "time"));
        }

        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }


    2.BoltA類
    -----------------------------------------------------------------------
   
 package test.storm;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    import java.util.Map;

    /**
     * 呼叫日誌的轉接頭類bolt -- 對通話記錄進行一次組裝
     */
    public class CallLogBolt implements IRichBolt {

        //收集器:用於接收spout傳送的元組資訊併發送給輸出者
        private OutputCollector collector;

        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }

        /**
         * 執行方法 -- 計算過程
         * @param tuple
         */
        public void execute(Tuple tuple) {
            //從元組中取出主叫
            String from = tuple.getString(0);
            //從tuple中取出被叫
            String to = tuple.getString(1);
            //從元組中取出通話時長
            Integer duration = tuple.getInteger(2);
            //收集器組裝訊息[處理通話記錄],產生新的tuple,然後傳送出去
            collector.emit(new Values(from + " - " + to, duration));
        }

        public void cleanup() {

        }

        /**
         * 給tuple定義輸出欄位的名稱
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("calllog", "duration"));
        }

        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }


    3.BoltB類
    -------------------------------------------------------------
    
package test.storm;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;

    import java.util.HashMap;
    import java.util.Map;

    /**
     * 呼叫日誌的轉接頭類bolt -- 對通話記錄進行統計
     */
    public class CounterBolt implements IRichBolt {

        Map<String, Integer> counterMap;
        private OutputCollector collector;

        /**
         * 初始化
         */
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

            this.counterMap = new HashMap<String, Integer>();
            collector = outputCollector;
        }

        /**
         * 統計通話次數
         */
        public void execute(Tuple tuple) {

            String calllog = tuple.getString(0);
            Integer duration = tuple.getInteger(1);
            if(!counterMap.containsKey(calllog)){
                counterMap.put(calllog, 1);
            }else{
                Integer c = counterMap.get(calllog) + 1;
                counterMap.put(calllog, c);
            }
            //到達最後一個bolt,進行ack確認,表示元組被處理完畢了[表示到此處,一個元組已經被處理完畢,往後沒有bolt了]
            collector.ack(tuple);
        }

        public void cleanup() {
            for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
                System.out.println(entry.getKey()+" : " + entry.getValue());
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("call"));
        }

        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }


    4.App類
    ----------------------------------------------------------------------
    
package test.storm;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;

    /**
     * app入口類 -- toplogy
     */
    public class App_Toplogy {

        public static void main(String [] args)
        {
            //storm cluster configuration
            Config config = new Config();
            config.setDebug(true);

            //建立top
            TopologyBuilder builder = new TopologyBuilder();
            //指定龍頭
            builder.setSpout("spoutA", new CallLogSpout());
            //採用shuffleGrouping的方式,將boltA對接到spoutA
            builder.setBolt("boltA", new CallLogBolt()).shuffleGrouping("spoutA");
            //採用fieldsGrouping的方式,對boltA中的call這個欄位進行分組,將boltB對接到boltA
            builder.setBolt("boltB", new CounterBolt()).fieldsGrouping("boltA", new Fields("calllog"));

           //        //本地模式
           //        LocalCluster cluster = new LocalCluster();
           //        //提交top
           //        cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
           //        try {
           //            Thread.sleep(10000);
           //        } catch (InterruptedException e) {
           //            e.printStackTrace();
           //        }
           //        //Stop the topology
           //        cluster.shutdown();

           //叢集模式
           try {
               StormSubmitter.submitTopology("mytop",config,builder.createTopology());
           } catch (Exception e) {
               e.printStackTrace();
           }
        }

    }



七、釋出到storm叢集上執行
-------------------------------------------------------------------------------
    1.修改app類 -- 改為叢集提交模式
        //叢集模式
        try {
            StormSubmitter.submitTopology("mytop",config,builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }

    2.匯出jar包

    3.在ubuntu上執行
        $bin> ./storm jar /share/storm/TestStrom-1.0-SNAPSHOT.jar test.storm.App_Toplogy

    4.在s200上開啟日誌檢視器
        $bin> ./storm logviewer &

    5.檢視結果[電話記錄統計資訊]
        $s400> cat /soft/storm/logs/workers-artifacts/mytop-1-1538085858/6700/worker.log | grep 1234


八、單詞統計案例
---------------------------------------------------------------------------
    1.WordSpout -- 產生單詞
    ---------------------------------------------
    
package test.storm.wc;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import util.Util;

    import java.util.Map;
    import java.util.Random;

    /**
     * 單詞產生源spout -- 水龍頭
     */
    public class WordSpout implements IRichSpout {

        private TopologyContext context;
        private SpoutOutputCollector collector;

        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            Util.sendToClient(this,"WordSpout.open()",7777);
            context = topologyContext;
            collector = spoutOutputCollector;
        }

        public void close() {

        }

        public void activate() {

        }

        public void deactivate() {

        }

        /**
         * 下一個
         */
        public void nextTuple() {

            //Util.sendToClient(this,"WordSpout.nextTuple()",7777);
            String line = "how are you" + " tom" + new Random().nextInt(100);
            collector.emit(new Values(line));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void ack(Object o) {

        }

        public void fail(Object o) {

        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("line"));
        }

        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }


    2.SplitBolt -- 進行切割
    ----------------------------------------------
    
package test.storm.wc;

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import util.Util;

    import java.util.Map;

    public class SpiltBolt implements IRichBolt {

        private TopologyContext context;
        private OutputCollector collector;

        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            Util.sendToClient(this,"SpiltBolt.prepare()",8888);

            this.context = topologyContext;
            this.collector = outputCollector;
        }

        /**
         * 計算
         */
        public void execute(Tuple tuple) {
            //Util.sendToClient(this,"SpiltBolt.execute()" + tuple.toString(), 8888);
            String str = tuple.getString(0);
            String [] strs = str.split(" ");
            for(String s : strs)
            {
                collector.emit(new Values(s));
            }
        }

        public void cleanup() {

        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word"));
        }

        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }

    3.CounterBolt -- 進行單詞統計
    ----------------------------------------------
    
package test.storm.wc;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import util.Util;

    import java.util.HashMap;
    import java.util.Map;

    public class CounterBolt implements IRichBolt {

        private TopologyContext context;
        private OutputCollector collector;
        private Map<String, Integer> map = new HashMap<String, Integer>();

        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

            Util.sendToClient(this,"CounterBolt.prepare()" , 9999);
            context = topologyContext;
            collector = outputCollector;
            map = new HashMap<String, Integer>();
        }

        public void execute(Tuple tuple) {

            //Util.sendToClient(this,"CounterBolt.execute()" + tuple.toString(),9999);
            String word = tuple.getString(0);
            if (map.containsKey(word)) {
                int count = map.get(word) + 1;
                map.put(word, count);
            }
            else {
                map.put(word, 1);
            }
            collector.ack(tuple);
        }

        public void cleanup() {
            for(Map.Entry<String, Integer> entry:map.entrySet()){
                System.out.println(entry.getKey()+" : " + entry.getValue());
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }

        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }


    4.App主程式
    ----------------------------------------------
    
package test.storm.wc;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;

    public class App {

        public static void main(String [] args)
        {
            //storm cluster configuration
            Config config = new Config();
            config.setDebug(true);
            //設定5個worker
            config.setNumWorkers(5);

            //建立top
            TopologyBuilder builder = new TopologyBuilder();
            //指定龍頭,設定併發暗示為3
            builder.setSpout("wordSpout", new WordSpout(),3).setNumTasks(3);
            //採用shuffleGrouping的方式,將boltA對接到spoutA,並設定併發暗示為4
            builder.setBolt("spiltBolt", new SpiltBolt(),4).shuffleGrouping("wordSpout");
            //採用fieldsGrouping的方式,對boltA中的call這個欄位進行分組,將boltB對接到boltA,設定併發暗示為5
            builder.setBolt("counterBolt", new CounterBolt(),5).fieldsGrouping("spiltBolt", new Fields("word"));

            //本地模式
    //        LocalCluster cluster = new LocalCluster();
    //        //提交top
    //        cluster.submitTopology("wc", config, builder.createTopology());
    //        try {
    //            Thread.sleep(10000);
    //        } catch (InterruptedException e) {
    //            e.printStackTrace();
    //        }
    //        //Stop the topology
    //        cluster.shutdown();

            //叢集模式
            try {
                StormSubmitter.submitTopology("wc",config,builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }



九、設定top的併發程度-- 配置任務數,worker數,執行緒數
---------------------------------------------------------------------
    1.設定worker數量
        conf.setNumWorkers(3);      //預設為1

    2.設定執行執行緒execcutor數量
        //指定龍頭,設定併發暗示為3  [每個worker 開啟三個計算執行緒來產生單詞源]
        builder.setSpout("wordSpout", new WordSpout(),3);

        //採用shuffleGrouping的方式,將boltA對接到spoutA,並設定併發暗示為4  [每個worker,開啟4個執行緒數來切割]
        builder.setBolt("spiltBolt", new SpiltBolt(), 4).shuffleGrouping("wordSpout");

    3.設定任務數
        //設定3個計算執行緒executor,6個任務Task -- 1個執行緒平均2個任務
        builder.setSpout("wordSpout", new WordSpout(),3).setNumTasks(6);

    4.按照上面的設定,結果為
        -- 以Spout為例
        -- 1個nimbus -- 3臺主機執行3個worker -- 每個worker開啟3個計算執行緒,總共6個task(均分),每個計算執行緒計算2個任務
        -- 一臺主機最多可以跑4個worker[配置檔案決定],並且會平均分配資源 -- 3臺主機執行3個worker
        -- 主機上開啟worker程序,worker程序上開啟executor執行緒,executor執行緒上開啟若干分執行緒Task,Task為最小單元,進行需求計算

    5.併發度 == 所有Tasks數量的總和