1. 程式人生 > >Storm筆記整理(五):可靠性分析、定時任務與Storm UI參數詳解

Storm筆記整理(五):可靠性分析、定時任務與Storm UI參數詳解

大數據 實時計算 Storm

[TOC]


特別說明:前面的四篇Storm筆記中,關於計算總和的例子中的spout,使用了死循環的邏輯,實際上這樣做是不正確的,原因很簡單,Storm提供給我們的API中,nextTuple方法就是循環執行了,這相當於是做了雙層循環。因為後面在做可靠性acker案例分析時發現,加入死循環邏輯後,該nextTuple所屬於的那個task根本就沒有辦法跳出這個nextTuple方法,也就沒有辦法執行後面的ack或者是fail方法,這點尤其需要註意。

Storm可靠性分析

基本原理

  • worker進程死掉

    worker進程掛掉,storm集群會在重新啟動一個worker進程。

  • supervisor進程死掉

    supervisor進程掛掉,不會影響之前已經提交的topology,只是後期不能向這個節點分配任務,因為這個節點已經不是集群的一員了。

  • nimbus進程死掉(存在HA的問題)快速失敗

    nimbus進程掛掉,也不會影響之前已經提交的topology,只是後期不能向集群再提交新的topology了。1.0以下的版本存在HA的問題,1.0之後已經修復了這個問題,可以有多個備選nimbus。

  • 節點宕機

  • ack/fail消息確認機制(確保一個tuple被完全處理)

    • 在spout中發射tuple的時候需要同時發送messageid,這樣才相當於開啟了消息確認機制
    • 如果你的topology裏面的tuple比較多的話, 那麽把acker的數量設置多一點,效率會高一點。
    • 通過config.setNumAckers(num)來設置一個topology裏面的acker的數量,默認值是1。
    • 註意: acker用了特殊的算法,使得對於追蹤每個spout tuple的狀態所需要的內存量是恒定的(20 bytes) (可以了解一下其算法,目前暫時不做這個算法的深入理解,百度storm acker就能找到相關的分析文章)
    • 註意:如果一個tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默認值為30秒)時間內沒有被成功處理,那麽這個tuple會被認為處理失敗了。
  • 完全處理tuple

    在storm裏面一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所衍生的所有的tuple都被成功處理。

可靠性acker案例

前面也提到了,如果希望使用qck/fail確認機制,則需要做下面的事情:

1.在我們的spout中重寫ack和fail方法
2.spout發送tuple時需要攜帶messageId
3.bolt成功或失敗處理後要主動進行回調

根據上面的說明,程序代碼如下,註意其中體現的這幾點:

package cn.xpleaf.bigdata.storm.acker;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;
import java.util.UUID;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 * <p>
 * storm消息確認機制---可靠性分析
 * acker
 * fail
 */
public class AckerSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

         private long num = 0;
        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            String messageId = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
//            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num), messageId);
//            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }

        @Override
        public void ack(Object msgId) {
            System.out.println(msgId + "對應的消息被處理成功了");
        }

        @Override
        public void fail(Object msgId) {
            System.out.println(msgId + "---->對應的消息被處理失敗了");
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;
            if (orderCost % 10 == 1) {   // 每10次模擬消息失敗一次
                collector.fail(input);
            } else {
                System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
                collector.ack(input);
            }
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最後一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當於在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt(), 1)
                .shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上遊組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = AckerSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

運行後(本地運行或上傳到集群上提交作業),輸出如下:

當前時間20180413215706產生的訂單金額:1
當前時間20180413215707產生的訂單金額:2
7a4ce596fd3a40659f2d7f80a7738f55---->對應的消息被處理失敗了
線程ID:133 ,商城網站到目前20180413215707的商品總交易額3
當前時間20180413215708產生的訂單金額:3
0555a933a49f413e94480be201a55615對應的消息被處理成功了
線程ID:133 ,商城網站到目前20180413215708的商品總交易額6
當前時間20180413215709產生的訂單金額:4
4b923132e4034e939c875aca368a8897對應的消息被處理成功了
線程ID:133 ,商城網站到目前20180413215709的商品總交易額10
當前時間20180413215710產生的訂單金額:5
51f159472e854ba282ab84a2218459b8對應的消息被處理成功了
線程ID:133 ,商城網站到目前20180413215710的商品總交易額15
......

Storm定時任務

一般的業務數據存儲,最終還是要落地,存儲到RDBMS,但是RDBMS無法達到高訪問量,能力達不到實時處理,或者說處理能力是有限的,會造成連接中斷等問題,為了數據落地,我們可以采取迂回方式,可以采用比如說先緩存到高速內存數據庫(如redis),然後再將內存數據庫中的數據定時同步到rdbms中,而且可以定期定時來做。

  • 可以每隔指定的時間將數據整合一次存入數據庫。
  • 或者每隔指定的時間執行一些

可以在storm中使用定時任務來實現這些定時數據落地的功能,不過需要先了解storm定時任務。

全局定時任務

在main中設置

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);  // 設置多久發送一個系統的tuple定時發射數據

但是我們一般都會對特定的bolt設置定時任務,而沒有必要對全局每一個bolt都發送系統的tuple,這樣非常的耗費資源,所以就有了局部定時任務,也是我們常用的。

註意:storm會按照用戶設置的時間間隔給拓撲中的所有bolt發送系統級別的tuple。在main函數中設置定時器,storm會定時給拓撲中的所有bolt都發送系統級別的tuple,如果只需要給某一個bolt設置定時功能的話,只需要在這個bolt中覆蓋getComponentConfiguration方法,裏面設置定時間隔即可。

測試代碼如下:

package cn.xpleaf.bigdata.storm.quartz;

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * 2°、單詞計數:監控一個目錄下的文件,當發現有新文件的時候,
        把文件讀取過來,解析文件中的內容,統計單詞出現的總次數
        E:\data\storm

        研究storm的定時任務
        有兩種方式:
            1.main中設置,全局有效
            2.在特定bolt中設置,bolt中有效
 */
public class QuartzWordCountTopology {

    /**
     * Spout,獲取數據源,這裏是持續讀取某一目錄下的文件,並將每一行輸出到下一個Bolt中
     */
    static class FileSpout extends BaseRichSpout {
        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void nextTuple() {
            File directory = new File("D:/data/storm");
            // 第二個參數extensions的意思就是,只采集某些後綴名的文件
            Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true);
            for (File file : files) {
                try {
                    List<String> lines = FileUtils.readLines(file, "utf-8");
                    for(String line : lines) {
                        this.collector.emit(new Values(line));
                    }
                    // 當前文件被消費之後,需要重命名,同時為了防止相同文件的加入,重命名後的文件加了一個隨機的UUID,或者加入時間戳也可以的
                    File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed");
                    FileUtils.moveFile(file, destFile);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }

    /**
     * Bolt節點,將接收到的每一行數據切割為一個個單詞並發送到下一個節點
     */
    static class SplitBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 確保不是系統發送的tuple,才使用我們的業務邏輯
                String line = input.getStringByField("line");
                String[] words = line.split(" ");
                for (String word : words) {
                    this.collector.emit(new Values(word, 1));
                }
            } else {
                System.out.println("splitBolt: " + input.getSourceComponent().toString());
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    /**
     * Bolt節點,執行單詞統計計算
     */
    static class WCBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Map<String, Integer> map = new HashMap<>();

        @Override
        public void execute(Tuple input) {
            if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 確保不是系統發送的tuple,才使用我們的業務邏輯
                String word = input.getStringByField("word");
                Integer count = input.getIntegerByField("count");
            /*if (map.containsKey(word)) {
                map.put(word, map.get(word) + 1);
            } else {
                map.put(word, 1);
            }*/
                map.put(word, map.getOrDefault(word, 0) + 1);

                System.out.println("====================================");
                map.forEach((k, v) -> {
                    System.out.println(k + ":::" + v);
                });
            } else {
                System.out.println("sumBolt: " + input.getSourceComponent().toString());
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,組裝Spout和Bolt節點,相當於在MapReduce中構建Job
     */
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        // dag
        builder.setSpout("id_file_spout", new FileSpout());
        builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout");
        builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt");

        StormTopology stormTopology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        String topologyName = QuartzWordCountTopology.class.getSimpleName();
        Config config = new Config();
        config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        cluster.submitTopology(topologyName, config, stormTopology);
    }
}

輸出:

splitBolt: __system
sumBolt: __system
splitBolt: __system
sumBolt: __system
......

局部定時任務

在bolt中使用下面代碼判斷是否是觸發用的bolt

tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)

如果為true,則執行定時任務需要執行的代碼,最後return,如果為false,則執行正常的tuple處理的業務邏輯。

即對於需要進行數據落地的bolt,我們可以只給該bolt設置定時任務,這樣系統會定時給該bolt發送系統級別的tuple,在我們該bolt的代碼中進行判斷,如果接收到的是系統級別的bolt,則進行數據落地的操作,比如將數據寫入數據庫或其它操作等,否則就按照正常的邏輯來執行我們的業務代碼。

工作中常用這一種方式進行操作。

測試程序如下:

package cn.xpleaf.bigdata.storm.quartz;

import clojure.lang.Obj;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * 2°、單詞計數:監控一個目錄下的文件,當發現有新文件的時候,
        把文件讀取過來,解析文件中的內容,統計單詞出現的總次數
        E:\data\storm

        研究storm的定時任務
        有兩種方式:
            1.main中設置,全局有效
            2.在特定bolt中設置,bolt中有效
 */
public class QuartzPartWCTopology {

    /**
     * Spout,獲取數據源,這裏是持續讀取某一目錄下的文件,並將每一行輸出到下一個Bolt中
     */
    static class FileSpout extends BaseRichSpout {
        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void nextTuple() {
            File directory = new File("D:/data/storm");
            // 第二個參數extensions的意思就是,只采集某些後綴名的文件
            Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true);
            for (File file : files) {
                try {
                    List<String> lines = FileUtils.readLines(file, "utf-8");
                    for(String line : lines) {
                        this.collector.emit(new Values(line));
                    }
                    // 當前文件被消費之後,需要重命名,同時為了防止相同文件的加入,重命名後的文件加了一個隨機的UUID,或者加入時間戳也可以的
                    File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed");
                    FileUtils.moveFile(file, destFile);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }

    /**
     * Bolt節點,將接收到的每一行數據切割為一個個單詞並發送到下一個節點
     */
    static class SplitBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word, 1));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    /**
     * Bolt節點,執行單詞統計計算
     */
    static class WCBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Map<String, Integer> map = new HashMap<>();

        @Override
        public void execute(Tuple input) {
            if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 確保不是系統發送的tuple,才使用我們的業務邏輯
                String word = input.getStringByField("word");
                Integer count = input.getIntegerByField("count");
            /*if (map.containsKey(word)) {
                map.put(word, map.get(word) + 1);
            } else {
                map.put(word, 1);
            }*/
                map.put(word, map.getOrDefault(word, 0) + 1);

                System.out.println("====================================");
                map.forEach((k, v) -> {
                    System.out.println(k + ":::" + v);
                });
            } else {
                System.out.println("sumBolt: " + input.getSourceComponent().toString() + "---" + System.currentTimeMillis());
            }
        }

        @Override
        public Map<String, Object> getComponentConfiguration() { // 修改局部bolt的配置信息
            Map<String, Object> config = new HashMap<>();
            config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
            return config;
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,組裝Spout和Bolt節點,相當於在MapReduce中構建Job
     */
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        // dag
        builder.setSpout("id_file_spout", new FileSpout());
        builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout");
        builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt");

        StormTopology stormTopology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        String topologyName = QuartzPartWCTopology.class.getSimpleName();
        Config config = new Config();
        cluster.submitTopology(topologyName, config, stormTopology);
    }
}

輸出如下:

sumBolt: __system---1523631954330
sumBolt: __system---1523631964330
sumBolt: __system---1523631974329
sumBolt: __system---1523631984329
sumBolt: __system---1523631994330
sumBolt: __system---1523632004330
sumBolt: __system---1523632014329
sumBolt: __system---1523632024330
......

Storm UI參數介紹

技術分享圖片

  • deactive:未激活(暫停)

  • emitted: emitted tuple數

  • transferred: transferred tuple數

    emitted的區別:如果一個task,emitted一個tuple到2個task中,則 transferred tuple數是emitted tuple數的兩倍

  • complete latency: spout emitting 一個tuple到spout ack這個tuple的平均時間(可以認為是tuple以及該tuple樹的整個處理時間)

  • process latency: bolt收到一個tuple到bolt ack這個tuple的平均時間,如果沒有啟動acker機制,那麽值為0

  • execute latency:bolt處理一個tuple的平均時間,不包含acker操作,單位是毫秒(也就是bolt 執行 execute 方法的平均時間)

  • capacity:這個值越接近1,說明bolt或者spout基本一直在調用execute方法,說明並行度不夠,需要擴展這個組件的executor數量。

總結:execute latency和proces latnecy是處理消息的時效性,而capacity則表示處理能力是否已經飽和,從這3個參數可以知道topology的瓶頸所在。

Storm筆記整理(五):可靠性分析、定時任務與Storm UI參數詳解