1. 程式人生 > >Apache Storm 官方文件 —— Trident API 概述

Apache Storm 官方文件 —— Trident API 概述

原文連結    譯者:魏勇

Trident 的核心資料模型是“流”(Stream),不過與普通的拓撲不同的是,這裡的流是作為一連串 batch 來處理的。流是分佈在叢集中的不同節點上執行的,並且對流的操作也是在流的各個 partition 上並行執行的。

Trident 中有 5 類操作:

  1. 針對每個小分割槽(partition)的本地操作,這類操作不會產生網路資料傳輸;
  2. 針對一個數據流的重新分割槽操作,這類操作不會改變資料流中的內容,但是會產生一定的網路傳輸;
  3. 通過網路資料傳輸進行的聚合操作;
  4. 針對資料流的分組操作;
  5. 融合與聯結操作。

本地分割槽操作

本地分割槽操作是在每個分割槽塊上獨立執行的操作,其中不涉及網路資料傳輸。

函式

函式負責接收一個輸入域的集合並選擇輸出或者不輸出 tuple。輸出 tuple 的域會被新增到原始資料流的輸入域中。如果一個函式不輸出 tuple,那麼原始的輸入 tuple 就會被直接過濾掉。否則,每個輸出 tuple 都會複製一份輸入 tuple 。假設你有下面這樣的函式:

public class MyFunction extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.
getInteger(0); i++) { collector.emit(new Values(i)); } } }

再假設你有一個名為 “mystream” 的資料流,這個流中包含下面幾個 tuple,每個 tuple 中包含有 “a”、“b”、“c” 三個域:

[1, 2, 3]
[4, 1, 6]
[3, 0, 8]

如果你執行這段程式碼:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

那麼最終輸出的結果 tuple 就會包含有 “a”、“b”、“c”、“d” 4 個域,就像下面這樣:

[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]

過濾器

過濾器負責判斷輸入的 tuple 是否需要保留。以下面的過濾器為例:

public class MyFilter extends BaseFilter {
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
    }
}

通過使用這段程式碼:

mystream.each(new Fields("b", "a"), new MyFilter())

就可以將下面這樣帶有 “a”、“b”、“c” 三個域的 tuple

[1, 2, 3]
[2, 1, 1]
[2, 3, 4]

最終轉化成這樣的結果 tuple:

[2, 1, 1]

partitionAggregate

partitionAggregate 會在一批 tuple 的每個分割槽上執行一個指定的功能操作。與上面的函式不同,由 partitionAggregate傳送出的 tuple 會將輸入 tuple 的域替換。以下面這段程式碼為例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假如輸入流中包含有 “a”、“b” 兩個域並且有以下幾個 tuple 塊:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]

經過上面的程式碼之後,輸出就會變成帶有一個名為 “sum” 的域的資料流,其中的 tuple 就是這樣的:

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]

Storm 有三個用於定義聚合器的介面:CombinerAggregatorReducerAggregator 以及 Aggregator

這是 CombinerAggregator 介面:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

CombinerAggregator 會將帶有一個域的一個單獨的 tuple 返回作為輸出。CombinerAggregator 會在每個輸入 tuple 上執行初始化函式,然後使用組合函式來組合所有輸入的值。如果在某個分割槽中沒有 tuple, CombinerAggregator 就會輸出zero 方法的結果。例如,下面是 Count 的實現程式碼:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

如果你使用 aggregate 方法來代替 partitionAggregate 方法,你就會發現 CombinerAggregator 的好處了。在這種情況下,Trident 會在傳送 tuple 之前通過分割槽聚合操作來優化計算過程。

ReducerAggregator 的介面實現是這樣的:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

ReducerAggregator 會使用 init 方法來產生一個初始化的值,然後使用該值對每個輸入 tuple 進行遍歷,並最終生成並輸出一個單獨的 tuple,這個 tuple 中就包含有我們需要的計算結果值。例如,下面是將 Count 定義為 ReducerAggregator 的程式碼:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}

ReducerAggregator 同樣可以用於 persistentAggregate,你會在後面看到這一點。

最常用的聚合器介面還是下面的 Aggregator 介面:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

Aggregator 聚合器可以生成任意數量的 tuple,這些 tuple 也可以帶有任意數量的域。聚合器可以在執行過程中的任意一點輸出tuple,他們的執行過程是這樣的:

  1. 在處理一批資料之前先呼叫 init 方法。init 方法的返回值是一個代表著聚合狀態的物件,這個物件接下來會被傳入 aggregate 方法和 complete 方法中。
  2. 對於一個區塊中的每個 tuple 都會呼叫 aggregate 方法。這個方法能夠更新狀態並且有選擇地輸出 tuple。
  3. 在區塊中的所有 tuple 都被 aggregate 方法處理之後就會呼叫 complete 方法。

下面是使用 Count 作為聚合器的程式碼:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

有時你可能會需要同時執行多個聚合操作。這個過程叫做鏈式處理,可以使用下面這樣的程式碼來實現:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

這段程式碼會在每個分割槽上分別執行 Count 和 Sum 聚合器,而輸出中只會包含一個帶有 [“count”, “sum”] 域的單獨的 tuple。

stateQuery 與 partitionPersist

stateQuery 與 partitionPersist 會分別查詢、更新 state 資料來源。你可以參考 Trident State 文件 來了解如何使用它們。

projection

projection 方法只會保留操作中指定的域。如果你有一個帶有 [“a”, “b”, “c”, “d”] 域的資料流,通過執行這段程式碼:

mystream.project(new Fields("b", "d"))

就會使得輸出資料流中只包含有 [“b”, “d”] 域。

重分割槽操作

重分割槽操作會執行一個用來改變在不同的任務間分配 tuple 的方式的函式。在重分割槽的過程中分割槽的數量也可能會發生變化(例如,重分割槽之後的並行度就有可能會增大)。重分割槽會產生一定的網路資料傳輸。下面是重分割槽操作的幾個函式:

  1. shuffle:通過隨機輪詢演算法來重新分配目標區塊的所有 tuple。
  2. broadcast:每個 tuple 都會被複制到所有的目標區塊中。這個函式在 DRPC 中很有用 —— 比如,你可以使用這個函式來獲取每個區塊資料的查詢結果。
  3. partitionBy:該函式會接收一組域作為引數,並根據這些域來進行分割槽操作。可以通過對這些域進行雜湊化,並對目標分割槽的數量取模的方法來選取目標區塊。partitionBy 函式能夠保證來自同一組域的結果總會被髮送到相同的目標區間。
  4. global:這種方式下所有的 tuple 都會被髮送到同一個目標分割槽中,而且資料流中的所有的塊都會由這個分割槽處理。
  5. batchGlobal:同一個 batch 塊中的所有 tuple 會被髮送到同一個區塊中。當然,在資料流中的不同區塊仍然會分配到不同的區塊中。
  6. partition:這個函式使用自定義的分割槽方法,該方法會實現 backtype.storm.grouping.CustomStreamGrouping 介面。

聚類操作

Trident 使用 aggregate 方法和 persistentAggregate 方法來對資料流進行聚類操作。其中,aggregate 方法會分別對資料流中的每個 batch 進行處理,而 persistentAggregate 方法則會對資料流中的所有 batch 執行聚類處理,並將結果存入某個 state 中。

在資料流上執行 aggregate 方法會執行一個全域性的聚類操作。在你使用 ReducerAggregator 或者 Aggregator 時,資料流首先會被重新分割槽成一個單獨的分割槽,然後聚類函式就會在該分割槽上執行操作。而在你使用 CombinerAggregator 時,Trident 首先會計算每個分割槽的部分聚類結果,然後將這些結果重分割槽到一個單獨的分割槽中,最後在網路資料傳輸完成之後結束這個聚類過程。CombinerAggregator 比其他的聚合器的執行效率更高,在聚類時應該儘可能使用CombinerAggregator

下面是一個使用 aggregate 來獲取一個 batch 的全域性計數值的例子:

mystream.aggregate(new Count(), new Fields("count"))

與 partitionAggregate 一樣,aggregate 的聚合器也可以進行鏈式處理。然而,如果你在一個處理鏈中同時使用了CombinerAggregator 和非 CombinerAggregator,Trident 就不能對部分聚類操作進行優化了。

想要了解更多使用 persistentAggregate 的方法,可以參考 Trident State 文件 一文。

對分組資料流的操作

通過對指定的域執行 partitionBy 操作,groupBy 操作可以將資料流進行重分割槽,使得相同的域的 tuple 分組可以聚集在一起。例如,下面是一個 groupBy 操作的示例:

groupBy

如果你在分組資料流上執行聚合操作,聚合器會在每個分組(而不是整個區塊)上執行。persistentAggregate 同樣可以在一個分組資料裡上執行,這種情況下聚合結果會儲存在 MapState 中,其中的 key 就是分組的域名。

和其他操作一樣,對分組資料流的聚合操作也可以以鏈式的方式執行。

融合(Merge)與聯結(join)

Trident API 的最後一部分是聯結不同的資料流的操作。聯結資料流最簡單的方式就是將所有的資料流融合到一個流中。你可以使用 TridentTopology 的 merge 方法實現該操作,比如這樣:

topology.merge(stream1, stream2, stream3);

Trident 會將融合後的新資料流的域命名為為第一個資料流的輸出域。

聯結資料流的另外一種方法是使用 join。像 SQL 那樣的標準 join 操作只能用於有限的輸入資料集,對於無限的資料集就沒有用武之地了。Trident 中的 join 只會應用於每個從 spout 中輸出的小 batch。

下面是兩個流的 join 操作的示例,其中一個流含有 [“key”, “val1”, “val2”] 域,另外一個流含有 [“x”, “val1”] 域:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

上面的例子會使用 “key” 和 “x” 作為 join 的域來聯結 stream1 和 stream2。Trident 要求先定義好新流的輸出域,因為輸入流的域可能會覆蓋新流的域名。從 join 中輸出的 tuple 中會包含:

  1. join 域的列表。在這個例子裡,輸出的 “key” 域與 stream1 的 “key” 域以及 stream2 的 “x” 域對應。
  2. 來自所有流的非 join 域的列表。這個列表是按照傳入 join 方法的流的順序排列的。在這個例子裡,“ a” 和 “b” 域與 stream1 的 “val1” 和 “val2” 域對應;而 “c” 域則與 stream2 的 “val1” 域相對應。

在對不同的 spout 傳送出的流進行 join 時,這些 spout 上會按照他們傳送 batch 的方式進行同步處理。也就是說,一個處理中的 batch 中含有每個 spout 傳送出的 tuple。

到這裡你大概仍然會對如何進行視窗 join 操作感到困惑。視窗操作(包括平滑視窗、滾動視窗等 —— 譯者注)主要是指將當前的 tuple 與過去若干小時時間段內的 tuple 聯結起來的過程。

你可以使用 partitionPersist 和 stateQuery 來實現這個過程。過去一段時間內的 tuple 會以 join 域為關鍵字被儲存到一個 state 源中。然後就可以使用 stateQuery 查詢 join 域來實現這個“聯結”(join)的過程。