1. 程式人生 > >Apache Storm 的安裝、配置及入門基礎(二)

Apache Storm 的安裝、配置及入門基礎(二)

      本文翻譯自:http://storm.apache.org/releases/current/Tutorial.html

      借鑑了:http://www.aboutyun.com/thread-7394-1-1.html

      過去十幾年資料處理取得了飛速的發展,特別是 MapReduce,Hadoop及其相關技術以不可思議的程度來進行大規模儲存和處理,但是這些處理技術不夠實時,Hadoop 系統也無法變成一個實時系統,因為實時資料處理系統需求是完全不同於批處理系統的。

    但是業務需求卻需要大規模實時資料處理,所以 Hadoop 就成為了資料處理生態系統中最大的一個短板。而Storm就彌補了這一不足。以前是人工建立網路佇列和 worker 實時處理過程,所以存在嚴重的不足,比如資料佇列冗長、系統易崩潰,也不適合大規模處理。現在 Storm極大地擴充了用例集、規模化、容錯、資料不丟失等等特性,處理速度是非常快的。

一、Storm 基本構成部分

        如圖1所示。Nimbus 是通過 zookeeper 來傳遞資訊的,也就是 zookeeper 在 nimbus 和 worker 之間建立了協調關係,所以對於 Strom 來說,zookeeper 至關重要。

       早在 storm.yaml 配置中,我們定義supervisor.slots.ports: 有4個埠,這就是 worker 的工作埠,一個 supervisor 有4個 worker 埠。

    節點有兩個: master 節點、 worker 節點。master節點執行 Nimbus服務,worker 節點執行 Supervisor服務。

    Nimbus 的作用:負責叢集上的程式碼分配; 機器上的任務分配;監控失效資料

    Supervisor 的作用:監聽分配的工作;啟動、停止worker程序。每一個 worker 程序就是執行一個 topology 的子集;一個 正在執行的topology分佈在叢集上每個機器上執行的如圖2所示。正在執行的topology你可以通過 UI web端檢視到。


圖2

    二、Topology 拓撲

        Topology是 Storm中很重要的概念,這是處理實時計算的核心概念。一個 Topology就是一個計算的拓撲圖,就是計算的路徑和節點構成,而且這個圖還是封閉的喲!不會自動計算完畢的,你得使用 Storm kill 來殺掉這個執行的Topology。Topology就包含了處理邏輯,節點間的連線,也就是節點間的資料是怎麼傳遞的。

       這個是執行的Topology:

      $ storm jar storm-starter-1.1.0.jar org.apache.storm.starter.ExclamationTopology ExclamationTopology

   三、Streams 資料流

        Storm 中核心抽象概念就是 Stream, 資料流就是大量的一系列的tuples(元組)。Storm 用基元(primitives)以一種分散式的、可靠的方式將一個數據流轉換成一個新的資料流。比如,將一個微博資料流變換一個熱門話題資料流。

        Storm 中提供的資料流轉換基元就是 spout 和 bolt。spout 和 bolt 提供了對應的介面,實現這個介面就可以執行特定應用的邏輯。

         一個 spout就是一個數據流的源,例如,可以從 Kestrel(storm-kestrel 開源)佇列中彈出的元組作為一個 spout,並可以將它們作為一個數據流。還有一個 spout可以呼叫 Twitter API 得到一個微博資料流。下面的一個圖就表明了從一個源頭取tuple元組形成一個 spout !就是從外部資料來源(佇列、資料庫等)中讀取資料,封裝成元組,形成資料流。


bolt 是用來處理資料裡的,可以對一些資料流(不只一個數據流)進行處理,可能會產生新的資料流。對一些複雜的資料流變換,比如從一個微博流中計算一個熱門話題流,就需要很多步驟,因此就有多個 bolt。因此 bolt 中就有很多處理,比如執行函式,做流式聚合,流式連結,訪問資料庫等等。bolts 就如下圖所示。

     

 從以上可以看出, spout 是一個 stream 的源, 而 bolt 卻是處理輸入的 streams,來產生新的 streams。注意,spout 和 bolt 只不過是流發生器而已。

 spout 和 bolt組成的網路就是拓撲圖Topology,如下圖所示。Topology就是最高層的邏輯抽象,可以直接送到 Storm 叢集去執行。一個Topology圖就是流式轉換,每個節點是 spout 或者 bolt。圖中的每條邊就是 bolt 訂閱了流,當一個 spout或者 bolt 產生一個元組到一個流時,它就傳送元組到訂閱了流的每個 bolt。


topology裡面的每一個節點都是並行執行的。可以指定每個節點的並行度, storm則會在叢集裡面分配大量的執行緒計算。 一個topology會一直執行,直到你kill 掉它。storm能自動重新分配一些執行失敗的任務, 並且storm保證不會有資料丟失, 即使一些機器意外停機並且訊息被丟掉的情況下。

四、資料模型

       storm使用元組tuple來作為它的資料模型。每個tuple是一堆值,每個值有一個名字,並且每個值可以是任何型別, 在我的理解裡面一個tuple可以看作一個沒有方法的java物件。總體來看,storm支援所有的基本型別、字串以及位元組陣列作為tuple的值型別。你也可以使用你自己定義的型別來作為值型別, 只要你實現對應的序列化器(serializer)。

一個Tuple代表資料流中的一個基本的處理單元,例如一條cookie日誌,它可以包含多個Field,每個Field表示一個屬性。
 
Tuple本來應該是一個Key-Value的Map,由於各個元件間傳遞的tuple的欄位名稱已經事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List。 一個沒有邊界的、源源不斷的、連續的Tuple序列就組成了Stream。  

topology裡面的每個節點必須定義它要產生的tuple的每個欄位。 比如下面這個bolt定義它所產生的tuple包含兩個欄位,型別分別是: double和triple。

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

declareOutputFields方法定義要輸出的欄位 : ["double", "triple"]。這個bolt的其它部分我們接下來會解釋。