1. 程式人生 > >【2019春招準備:106.storm(1)】

【2019春招準備:106.storm(1)】

0.簡介

a million tuples processed per second per node
單個節點美妙百萬數量級的實時計算
scalable 可擴充套件性
fault-tolerant 容錯性

生於Twitter,收購的BackType,並開源到apache
底層語言clojure,java混合體
api:javadoc

hadoop VS storm
hadoop: map reduce
storm:spout(產生資料來源) bolt(處理)
storm不同的是沒有結束程序,就算沒有資料進來,也不會結束(扶梯和電梯的區別)
使用場景不同(實時流處理-離線批處理)

storm VS sparkStreaming
在這裡插入圖片描述
並不是一個真正意義的實時流處理 微小批處理 可以設定,屬於spark生態圈

1. 核心概念

http://storm.apache.org/releases/1.2.2/Concepts.html

Streams 資料流
Spouts 產生資料流的東西(可能是多個)reliable(ack,fail), unreliable
Bolts 處理資料流的東西(可能是多個)filtering,functions,aggregations.joins,talking todatabases execute 多執行緒,非同步
Tuple 資料流裡的資料,nextTuple方法
Topologies 整個資料處理的生產線(類似於mapreduce)
在這裡插入圖片描述

2. storm程式設計

Idea & Maven 構建storm專案
ISpout介面
IComponent介面
IBolt介面
求和 + 詞頻統計案例

環境:jdk1.8 IDEA2018.2 Maven 3.5.4

【ISpout】:負責將資料傳送到topology中處理
storm會跟蹤每一個spout發出去的tuple的DAG(tuple中含一個messageID 任意型別)
storm在每一個執行緒裡面執行ack.fail.nextTuple方法,這意味著使用ISpout的時候不用擔心併發的問題,因為都是執行緒安全的。但是這些的前提是使用者必須保證,nextTuple是非阻塞的,不然ack和fail會被阻塞掉

void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
一個task在worker上面執行的時候的初始化步驟

void close();
spout 在shutdowm的時候呼叫的方法,但是並不保證close會被呼叫

void nextTuple();傳送資料
storm 請求spout傳送tuple的時候呼叫這個方法,該方法不能阻塞

void ack(Object msgId);
tuple處理成功,storm返回給spout成功的訊息

【IComponent】
給topology裡面的所有元件提供公用的方法

void declareOutputFields(OutputFieldsDeclarer declarer);
用於宣告Spout/Bolt傳送的tuple的名稱

【IBolt】
一個ibolt代表一個元件(component),先接受拿出tuple並且處理它
能夠filtering joining functions和aggregation

生命週期:
在客戶端上面建立,序列化,提交到主叢集上面(Nimbus)(有點像yarn中的resourcemanager)。Nimbus啟動一個worker去反序列化,啟動處理tuple的工作。

void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
準備好一個outputCollector

void execute(Tuple input);
tuple含有自己的元資料:來源地,值(getValue可以訪問到)

cleanup
資源清理操作,就算是寫了也不一定被執行
在這裡插入圖片描述

在這裡插入圖片描述

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * 使用storm實現累計求和的操作
 */

public class LocalSumStormTopology {


    /**
     * Spout需要繼承BaseRichSpout
     * 資料來源需要產生資料併發射
     */
    public static class DataSourceSpout extends BaseRichSpout{
        private SpoutOutputCollector collector;
        int number=0;
        /**
         * 只會被呼叫一次
         * @param conf 配置引數 暫時不管
         * @param context 上下文
         * @param collector 資料的發射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector=collector;
        }

        /**
         * 會產生資料,在生產上肯定是從訊息佇列中獲取資料
         * 應該設計成一個死迴圈,一直髮送,因為是處理實時流資料
         */
        @Override
        public void nextTuple() {
            this.collector.emit(new Values(++number));//Values是一個ArrayList陣列,所有的構造方法的引數都能加入到陣列中
            System.out.println("Spout:"+number);

            //不要一次性太多;防止資料產生太快
            Utils.sleep(1000);
        }


        /**
         * 宣告輸出欄位
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("number_"));//因為前面values裡面只有一個東西,因襲這裡對應上只給一個名稱
        }
    }


    /**
     * 資料的累積求和Bolt:接受資料並處理
     */
    public static class SumBolt extends BaseRichBolt{
        int sum=0;
        /**
         * 初始化方法,只會被執行一次
         * @param stormConf
         * @param context
         * @param collector 因此這一次的業務邏輯很簡單,不需要繼續往下面一個bolt傳送
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        /**
         * 也是一個死迴圈:獲取spout傳送給過來的資料
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            //可以bolt中獲取值,可以根據index獲取,也可以根據field名稱獲取,建議使用名稱獲取
            Integer value = input.getIntegerByField("number_");
            sum+=value;
            System.out.println("Bolt: sum=[ "+sum+"]");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {
        /**
         * 如果不是本地,而是真正在storm叢集上面提交,需要用的不是LocalCluster而是StormSubmitter
         */
        LocalCluster cluster=new LocalCluster();//建立一個模擬的本地storm叢集,本地模式執行不需要搭建storm叢集


        //builder中可以設定spout和bolt的執行順序,其中id都是自定義的
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("DataSourceSpout_",new DataSourceSpout());
        builder.setBolt("SumBolt_",new SumBolt()).shuffleGrouping("DataSourceSpout_");//shuffle指定執行順序
        StormTopology topology = builder.createTopology();
        /**
         * 向叢集提交一個topology,引數在原始碼中並沒有,
         * 第一個是類名稱
         * 第二個是new Config
         * 第三個是一StormTopology,使用TopologyBuilder建立
         */
        cluster.submitTopology("LocalSumStormTopology",new Config(),topology);
    }
}

在這裡插入圖片描述


import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * 使用storm實現累計求和的操作
 */

public class LocalSumStormTopology {


    /**
     * Spout需要繼承BaseRichSpout
     * 資料來源需要產生資料併發射
     */
    public static class DataSourceSpout extends BaseRichSpout{
        private SpoutOutputCollector collector;
        int number=0;
        /**
         * 只會被呼叫一次
         * @param conf 配置引數 暫時不管
         * @param context 上下文
         * @param collector 資料的發射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector=collector;
        }

        /**
         * 會產生資料,在生產上肯定是從訊息佇列中獲取資料
         * 應該設計成一個死迴圈,一直髮送,因為是處理實時流資料
         */
        @Override
        public void nextTuple() {
            this.collector.emit(new Values(++number));//Values是一個ArrayList陣列,所有的構造方法的引數都能加入到陣列中
            System.out.println("Spout:"+number);

            //不要一次性太多;防止資料產生太快
            Utils.sleep(1000);
        }


        /**
         * 宣告輸出欄位
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("number_"));//因為前面values裡面只有一個東西,因襲這裡對應上只給一個名稱
        }
    }


    /**
     * 資料的累積求和Bolt:接受資料並處理
     */
    public static class SumBolt extends BaseRichBolt{
        int sum=0;
        /**
         * 初始化方法,只會被執行一次
         * @param stormConf
         * @param context
         * @param collector 因此這一次的業務邏輯很簡單,不需要繼續往下面一個bolt傳送
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        /**
         * 也是一個死迴圈:獲取spout傳送給過來的資料
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            //可以bolt中獲取值,可以根據index獲取,也可以根據field名稱獲取,建議使用名稱獲取
            Integer value = input.getIntegerByField("number_");
            sum+=value;
            System.out.println("Bolt: sum=[ "+sum+"]");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {
        /**
         * 如果不是本地,而是真正在storm叢集上面提交,需要用的不是LocalCluster而是StormSubmitter
         */
        LocalCluster cluster=new LocalCluster();//建立一個模擬的本地storm叢集,本地模式執行不需要搭建storm叢集


        //builder中可以設定spout和bolt的執行順序,其中id都是自定義的
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("DataSourceSpout_",new DataSourceSpout());
        builder.setBolt("SumBolt_",new SumBolt()).shuffleGrouping("DataSourceSpout_");//shuffle指定執行順序
        StormTopology topology = builder.createTopology();
        /**
         * 向叢集提交一個topology,引數在原始碼中並沒有,
         * 第一個是類名稱
         * 第二個是new Config
         * 第三個是一StormTopology,使用TopologyBuilder建立
         */
        cluster.submitTopology("LocalSumStormTopology",new Config(),topology);
    }
}

但是發現結果hin恐怖:
a=8425
b=16849
c=8425
d=12636
a=8425
b=16850
c=8425
d=12636
a=8425
b=16851
c=8425
d=12636
a=8425
b=16851
c=8425
d=12637
a=8425
b=16851
c=8425
d=12638
。。。
原因是:storm中,不管是Spout的nextTuple還是Bolt中的execute都是死迴圈(因為是處理實時流資料的),所以在讀取和傳送資料的時候是永動的!!!!
(解決方法,讀完檔案之後將檔名稱改掉,因為設定的時候讀取的是該資料夾下面的字尾txt檔案)

import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * 使用storm完成詞頻統計功能
 */
public class LocalWordCountStormTopology {

    public static class DataSourceSpout extends BaseRichSpout {
        private SpoutOutputCollector collector;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }

        /**
         * 處理的業務邏輯
         * 1.讀取指定目錄的資料夾下的資料D:\ZZBfiles\StormFile\wordCount
         * 2.每一行資料發射出去
         */
        @Override
        public void nextTuple() {
            //採用的common.io裡面的工具類方便讀取操作,不要老是inputstream
            //獲取所有檔案
            Collection<File> files = FileUtils.listFiles(new File("D:\\ZZBfiles\\StormFile\\wordCount"),
                    new String[]{"txt"}, true);
            for (File file : files) {//獲取一個檔案
                try {
                    List<String> lines = FileUtils.readLines(file);
                    for (String line : lines) {//獲取檔案中的一行
                        this.collector.emit(new Values(line));//傳送這一行
                    }
                    //在這裡修改檔名稱就不會重複讀取了
                    FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line_"));
        }
    }

    /**
     * 對資料進行分割,併發送分隔好的單詞出去
     */
    public static class SplitBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }


        /**
         * 業務邏輯:
         * line對其分割,按照“,”
         *
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line_");
            String[] words = line.split(",");
            for(String word:words){
                this.collector.emit(new Values(word));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word_"));
        }
    }


    /**
     * 詞頻彙總Bolt
     */
    public static class CountBolt extends BaseRichBolt{
        Map<String,Integer> map=new HashMap<>();

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        /**
         * 業務邏輯:
         * 1.獲取每一個單詞
         * 2.對所有單詞進行彙總
         * 3.輸出
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            String word=input.getStringByField("word_");
            Integer count=map.get(word);
            if(count==null){
                count=1;
            }else{
                count++;
            }
            map.put(word,count);//新增的時候hashmap會自動覆蓋相同的key的entry

            System.out.println("======================");
            Set<Map.Entry<String, Integer>> entrySet = map.entrySet();
            for(Map.Entry<String, Integer> entry:entrySet){
                System.out.println(entry);
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {
        //topo建立
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout_",new DataSourceSpout());
        builder.setBolt("SplitBolt_",new SplitBolt()).shuffleGrouping("DataSourceSpout_");
        builder.setBolt("CountBolt_",new CountBolt()).shuffleGrouping("SplitBolt_");

        //建立本地叢集
        LocalCluster cluster=new LocalCluster();
        cluster.submitTopology("LocalWordCountStormTopology",new Config(),builder.createTopology());
    }
}

【注意事項】

  • Spout還是Bolt的名稱不要重複
  • 名稱不要以下劃線開頭
  • topology名稱不能重複(就是說不能執行相同名稱的topology),但是local的時候似乎可以成功