1. 程式人生 > >Storm和Zookeeper叢集搭建及在java專案中的使用

Storm和Zookeeper叢集搭建及在java專案中的使用

上一篇:

介紹了分散式Zookeeper叢集的搭建和Kafka叢集的搭建,接下來學習一下Storm叢集的搭建。

實驗環境

  1. Cent OS 6.9
  2. apache-storm-1.1.0
  3. 三臺伺服器虛擬機器:192.168.1.129、192.168.1.214、192.168.1.241

這裡寫圖片描述

準備工作

1、修改伺服器hostname和hosts檔案使得可以直接通過hostname訪問

使用hostname name修改hostname,例如:

#hostname xuliugen129

(或者直接修改/etc/hostname檔案,輸入要設定的hostname)

將三臺伺服器分別修改為:xuliugen129、xuliugen214、xuliugen241。

2、然後,修改/etc/hosts使之在內網環境下可以ping 通:

這裡寫圖片描述

每臺機子都設定為上邊的hostname,只需要新增上邊的三行即可。

3、關閉3臺伺服器的防火牆:

service firewalld stop

Zookeeper安裝與配置

在上一篇中介紹了Zookeeper叢集的安裝方式,這裡不再介紹,請檢視:Kafka 單機和分散式環境搭建與案例使用

Storm安裝與配置

1、選擇合適的版本進行下載Storm

使用wget進行下載:

>wget http://mirrors.tuna
.tsinghua.edu.cn/apache/storm/apache-storm-1.1.0/apache-storm-1.1.0.tar.gz

這裡寫圖片描述

分別下載到3臺伺服器上,然後解壓到自己合適的位置,我這裡都是:/home/xuliugen/server

2、配置Storm

配置檔案在:/home/xuliugen/server/apache-storm-1.1.0/conf 目錄下的storm.yaml

預設只需要修改如下部分:

這裡寫圖片描述

3臺伺服器的Storm進行上述相同的配置即可。

Storm啟動與應用

1、Storm命令

啟動命令在:/home/xuliugen/server/apache-storm-1.1.0/bin

資料夾下,

這裡寫圖片描述

以上列出了所有可以執行的命令模式。

2、啟動nimbus

nimbus主機為:xuliugen129

這裡寫圖片描述

另外開啟一個控制介面,使用jps檢視是否正常啟動:

這裡寫圖片描述

nimbus表示正常啟動。

3、啟動Storm UI

Storm UI這可以在啟動nimbus的機器上執行,這裡是:xuliugen129,啟動如下:

這裡寫圖片描述

啟動之後,jps如下:

這裡寫圖片描述

Storm UI 程序為core,訪問xuliugen129:8080(這裡我的PC級訪問的話,就不再是上述設定的hostname,應該是對應的內網IP,預設埠為8080)

這裡寫圖片描述

3、啟動另外兩臺supervisor

主機分別為:xuliugen214、xuliugen241

>./storm supervisor

這裡寫圖片描述

這裡寫圖片描述

檢視Storm UI如下:

這裡寫圖片描述

4、最後檢視任一臺Zookeeper伺服器,檢視節點情況:

這裡寫圖片描述

這裡寫圖片描述

專案程式碼使用

1、專案結構

這裡寫圖片描述

核心jar為:storm-core-1.1.0.jar

2、執行原理:

一個Topology是Spouts和Bolts組成的圖, 通過Stream Groupings將圖中的Spouts和Bolts連線起來,如下圖:

這裡寫圖片描述

3、Spout程式碼

public class RandomNameSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;

    private SpoutOutputCollector collector;

    // 模擬一些資料
    String[] names = { "zhangsan", "lisi", "wangwu", "zhaoliu", "sunqi", "wangba" };

    /**
     * 不斷地往下一個元件傳送tuple訊息 這裡面是該spout元件的核心邏輯
     */
    @Override
    public void nextTuple() {
        Random random = new Random();
        int index = random.nextInt(names.length);

        // 通過隨機數拿到一個姓名
        String lowerName = names[index];

        // 將姓名封裝成tuple,傳送訊息給下一個元件
        collector.emit(new Values(lowerName));

        // 每傳送一個訊息,休眠500ms
        Utils.sleep(500);

    }

    /**
     * 初始化方法,在spout元件例項化時呼叫一次c
     */
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    /**
     * 宣告本spout元件傳送出去的tuple中的資料的欄位名
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("lowerName"));
    }

}

3、UpperBolt程式碼

public class UpperBolt extends BaseRichBolt {

    private static final long serialVersionUID = 1L;

    private OutputCollector collector;

    @Override
    public void execute(Tuple tuple) {
        // 先獲取到上一個元件傳遞過來的資料,資料在tuple裡面
        String lowerName = tuple.getString(0);

        // 將姓名轉換成大寫
        String upperName = lowerName.toUpperCase();

        // 將轉換完成的商品名傳送出去
        collector.emit(new Values(upperName));

    }

    @Override
    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * 宣告該bolt元件要發出去的tuple的欄位
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("upperName"));
    }

}

4、AppendBolt程式碼

public class AppendBolt extends BaseRichBolt {

    private static final long serialVersionUID = 1L;

    FileWriter fileWriter = null;

    @Override
    public void execute(Tuple tuple) {
        // 先拿到上一個元件傳送過來的姓名
        String upperName = tuple.getString(0);
        String suffix_name = upperName + "_csdn";
        try {
            fileWriter.write(suffix_name);
            fileWriter.write("\n");
            fileWriter.flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 在bolt元件執行過程中只會被呼叫一次
     */
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        try {
            fileWriter = new FileWriter("/home/xuliugen/server/stormdata/" + UUID.randomUUID());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 本bolt已經不需要傳送tuple訊息到下一個元件,所以不需要再宣告tuple的欄位
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
    }

}

5、TopologyMain程式碼

public class TopologyMain {

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        // 將我們的spout元件設定到topology中去
        // parallelism_hint :4 表示用4個excutor來執行這個元件
        // setNumTasks(8) 設定的是該元件執行時的併發task數量,也就意味著1個excutor會執行2個task
        builder.setSpout("randomspout", new RandomNameSpout(), 4).setNumTasks(8);

        // 將大寫轉換bolt元件設定到topology,並且指定它接收randomspout元件的訊息
        // .shuffleGrouping("randomspout")包含兩層含義:
        // 1、upperbolt元件接收的tuple訊息一定來自於randomspout元件
        // 2、randomspout元件和upperbolt元件的大量併發
        //    task例項之間收發訊息時採用的分組策略是隨機分組shuffleGrouping
        builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");

        // 將新增字尾的bolt元件設定到topology,並且指定它接收upperbolt元件的訊息
        builder.setBolt("suffixbolt", new AppendBolt(), 4).shuffleGrouping("upperbolt");

        // 用builder來建立一個topology
        StormTopology stormTopologyDemo = builder.createTopology();

        // 配置一些topology在叢集中執行時的引數
        Config conf = new Config();
        // 這裡設定的是整個StormTopologyDemo所佔用的槽位數,也就是worker的數量
        conf.setNumWorkers(4);
        conf.setDebug(true);
        conf.setNumAckers(0);

        // 將這個topology提交給storm叢集執行
        StormSubmitter.submitTopology("StormTopologyDemo", conf, stormTopologyDemo);

    }
}

6、提交到Storm叢集執行:

1、將專案匯出jar,步驟如下:

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

2、如果有警告的話,直接忽略即可。
3、將匯出的jar上傳到伺服器上,nimbus主機即可,我這裡上傳到:/home/xuliugen/temp 目錄下,

4、在2臺supervisor主機上建立專案中需要的目錄:

/home/xuliugen/server/stormdata/

5、提交到Storm叢集的命令為:

>./storm jar /home/xuliugen/temp/stormdemo.jar com.xuliugen.demo.topology.TopologyMain
1)/home/xuliugen/temp/stormdemo.jar為打包的jar在伺服器上的位置;
(2com.xuliugen.demo.topology.TopologyMain位主函式全路徑;

這裡寫圖片描述

6、最後檢視Storm UI可以看到:

這裡寫圖片描述

7、執行效果

這裡寫圖片描述