Apache Storm 的安裝、配置及入門基礎(二)
本文翻譯自:http://storm.apache.org/releases/current/Tutorial.html
借鑑了:http://www.aboutyun.com/thread-7394-1-1.html
過去十幾年資料處理取得了飛速的發展,特別是 MapReduce,Hadoop及其相關技術以不可思議的程度來進行大規模儲存和處理,但是這些處理技術不夠實時,Hadoop 系統也無法變成一個實時系統,因為實時資料處理系統需求是完全不同於批處理系統的。
但是業務需求卻需要大規模實時資料處理,所以 Hadoop 就成為了資料處理生態系統中最大的一個短板。而Storm就彌補了這一不足。以前是人工建立網路佇列和 worker 實時處理過程,所以存在嚴重的不足,比如資料佇列冗長、系統易崩潰,也不適合大規模處理。現在 Storm極大地擴充了用例集、規模化、容錯、資料不丟失等等特性,處理速度是非常快的。
一、Storm 基本構成部分
如圖1所示。Nimbus 是通過 zookeeper 來傳遞資訊的,也就是 zookeeper 在 nimbus 和 worker 之間建立了協調關係,所以對於 Strom 來說,zookeeper 至關重要。
早在 storm.yaml 配置中,我們定義supervisor.slots.ports: 有4個埠,這就是 worker 的工作埠,一個 supervisor 有4個 worker 埠。
節點有兩個: master 節點、 worker 節點。master節點執行 Nimbus服務,worker 節點執行 Supervisor服務。
Nimbus 的作用:負責叢集上的程式碼分配; 機器上的任務分配;監控失效資料
Supervisor 的作用:監聽分配的工作;啟動、停止worker程序。每一個 worker 程序就是執行一個 topology 的子集;一個 正在執行的topology分佈在叢集上每個機器上執行的如圖2所示。正在執行的topology你可以通過 UI web端檢視到。
圖2
二、Topology 拓撲
Topology是 Storm中很重要的概念,這是處理實時計算的核心概念。一個 Topology就是一個計算的拓撲圖,就是計算的路徑和節點構成,而且這個圖還是封閉的喲!不會自動計算完畢的,你得使用 Storm kill 來殺掉這個執行的Topology。Topology就包含了處理邏輯,節點間的連線,也就是節點間的資料是怎麼傳遞的。
這個是執行的Topology:
$ storm jar storm-starter-1.1.0.jar org.apache.storm.starter.ExclamationTopology ExclamationTopology
三、Streams 資料流
Storm 中核心抽象概念就是 Stream, 資料流就是大量的一系列的tuples(元組)。Storm 用基元(primitives)以一種分散式的、可靠的方式將一個數據流轉換成一個新的資料流。比如,將一個微博資料流變換一個熱門話題資料流。
Storm 中提供的資料流轉換基元就是 spout 和 bolt。spout 和 bolt 提供了對應的介面,實現這個介面就可以執行特定應用的邏輯。
一個 spout就是一個數據流的源,例如,可以從 Kestrel(storm-kestrel 開源)佇列中彈出的元組作為一個 spout,並可以將它們作為一個數據流。還有一個 spout可以呼叫 Twitter API 得到一個微博資料流。下面的一個圖就表明了從一個源頭取tuple元組形成一個 spout !就是從外部資料來源(佇列、資料庫等)中讀取資料,封裝成元組,形成資料流。
bolt 是用來處理資料裡的,可以對一些資料流(不只一個數據流)進行處理,可能會產生新的資料流。對一些複雜的資料流變換,比如從一個微博流中計算一個熱門話題流,就需要很多步驟,因此就有多個 bolt。因此 bolt 中就有很多處理,比如執行函式,做流式聚合,流式連結,訪問資料庫等等。bolts 就如下圖所示。
從以上可以看出, spout 是一個 stream 的源, 而 bolt 卻是處理輸入的 streams,來產生新的 streams。注意,spout 和 bolt 只不過是流發生器而已。
spout 和 bolt組成的網路就是拓撲圖Topology,如下圖所示。Topology就是最高層的邏輯抽象,可以直接送到 Storm 叢集去執行。一個Topology圖就是流式轉換,每個節點是 spout 或者 bolt。圖中的每條邊就是 bolt 訂閱了流,當一個 spout或者 bolt 產生一個元組到一個流時,它就傳送元組到訂閱了流的每個 bolt。
四、資料模型
storm使用元組tuple來作為它的資料模型。每個tuple是一堆值,每個值有一個名字,並且每個值可以是任何型別, 在我的理解裡面一個tuple可以看作一個沒有方法的java物件。總體來看,storm支援所有的基本型別、字串以及位元組陣列作為tuple的值型別。你也可以使用你自己定義的型別來作為值型別, 只要你實現對應的序列化器(serializer)。
一個Tuple代表資料流中的一個基本的處理單元,例如一條cookie日誌,它可以包含多個Field,每個Field表示一個屬性。Tuple本來應該是一個Key-Value的Map,由於各個元件間傳遞的tuple的欄位名稱已經事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List。 一個沒有邊界的、源源不斷的、連續的Tuple序列就組成了Stream。
topology裡面的每個節點必須定義它要產生的tuple的每個欄位。
比如下面這個bolt定義它所產生的tuple包含兩個欄位,型別分別是: double和triple。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
declareOutputFields方法定義要輸出的欄位 : ["double", "triple"]。這個bolt的其它部分我們接下來會解釋。