1. 程式人生 > >Apache Storm 官方文件 —— Trident 教程

Apache Storm 官方文件 —— Trident 教程

原文連結    譯者:魏勇

Trident 是 Storm 的一種高度抽象的實時計算模型,它可以將高吞吐量(每秒百萬級)資料輸入、有狀態的流式處理與低延時的分散式查詢無縫結合起來。如果你瞭解 Pig 或者 Cascading 這樣的高階批處理工具,你就會發現他們和 Trident 的概念非常相似。Trident 同樣有聯結(join)、聚合(aggregation)、分組(grouping)、函式(function)以及過濾器(filter)這些功能。Trident 為資料庫或者其他持久化儲存上層的狀態化、增量式處理提供了基礎原語。由於 Trident 有著一致的、恰好一次的語義,因此推斷出 Trident 拓撲的狀態也是一件很容易的事。

使用範例

讓我們先從一個使用 Trident 的例子開始。這個例子中做了兩件事情:

  1. 從一個句子的輸入資料流中計算出單詞流的數量
  2. 實現對一個單詞列表中每個單詞總數的查詢

為了實現這個目的,這個例子將會從下面的資料來源中無限迴圈地讀取語句資料流:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"
), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true);

這個 Spout 會迴圈地訪問語句集來生成語句資料流。下面的程式碼就是用來實現計算過程中的單詞資料流統計部分:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.
newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6);

讓我們一行行地來分析上面的程式碼。首先我們建立了一個 TridentTopology 物件,這個物件提供了構造 Trident 計算過程的介面。TridentTopology 有一個叫做 newStream 的方法,這個方法可以從一個輸入資料來源中讀取資料建立一個新的資料流。在這個例子中,輸入的資料來源就是前面定義的 FixedBatchSpout。輸入資料來源也可以是像 Kestrel 和 Kafka 這樣的訊息系統。Trident 會通過 ZooKeeper 一直跟蹤每個輸入資料來源的一小部分狀態(Trident 具體消費物件的相關元資料)。例如這裡的 “spout1” 就對應著 ZooKeeper 中的一個節點,而 Trident 就會在該節點中存放資料來源的元資料(metadata)。

Trident 會將資料流處理為很多個小塊 tuple 的集合,例如,輸入的句子流就會像下面這樣被分割成很多個小塊:

batches

這些小塊的大小主要取決於你的輸入吞吐量,一般可能會在數萬甚至數百萬元組的級別。

Trident 為這些小塊提供了一個完全成熟的批處理 API。這個 API 和你見到過的 Pig 或者 Cascading 這樣的 Hadoop 的高階抽象語言很相似:你可以處理分組(group by)、聯結(join)、聚合(aggregation)、函式(function)、過濾器(filter)等各種操作。當然,分別處理每個小塊並不是件好事,所以,Trident 提供了適用於處理各個小塊之間的聚合操作的函式,並且可以在聚合後將結果儲存到持久化儲存中,而且無論是記憶體、Memcached、Cassandra 還是其他型別的儲存都可以支援。最後,Trident 還提供了用於查詢實時狀態結果的一級介面。而這個結果狀態既可以像這個例子中演示的那樣由 Trident 負責更新,也可以作為一個獨立的狀態資料來源而存在。

再回到這個例子中,輸入資料來源 spout 傳送出了一個名為 “sentence” 的資料流。接下來拓撲中定義了一個 Split 方法用於處理流中的每個 tuple,這個方法接收 “sentence” 域並將其分割成若干個單詞。每個 sentence tuple 都會建立很多個單詞 tuple —— 例如 “the cow jumped over the moon” 這個句子就會建立 6 個 “word” tuple,下面是 Split 的定義:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

從上面的程式碼中你會發現這個過程真的很簡單。這個方法中的所有操作僅僅是抓取句子、以空格分隔句子並且為每個單詞發射一個 tuple。

拓撲的剩餘部分負責統計單詞的數量並將結果儲存到持久化儲存中。首先,資料流根據 “word” 域分組,然後使用 Count聚合器持續聚合每個小組。persistentAggregate 方法用於儲存並更新 state 源中的聚合結果。在這個例子中,單詞的數量結果是儲存在記憶體中的,不過可以根據需要切換到 Memcached、Cassandra 或者其他持久化儲存中。切換儲存模型也非常簡單,只需要像下面這樣(使用 trident-memcached 修改 persistentAggregate 行中的一個引數(其中,“serverLocations” 是 Memcached 叢集的地址/埠列表)即可:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))

persistentAggregate 方法所儲存的值就表示所有從資料流中傳送出來的塊的聚合結果。

Trident 的另一個很酷的特性就是它支援完全容錯性和恰好一次處理的語義。如果處理過程中出現錯誤需要重新執行處理操作,Trident 不會向資料庫中提交多次來自相同的源資料的更新操作,這就是 Trident 持久化 state 的方式。

persistentAggregate 方法也可以將資料流結果傳入一個 TridentState 物件中。這種情況下,這個 TridentState 就表示所有的單詞統計資訊。這樣我們就可以使用 TridentState 物件來實現整個計算過程中的分散式查詢部分。

接下來我們就可以在拓撲中實現 word count 的一個低延時分散式查詢。這個查詢接收一個由空格分隔的單詞列表作為引數,然後返回這些單詞的數量統計結果。這個查詢看上去與普通的 RPC 呼叫並沒有什麼分別,不過在後臺他們是併發執行的。下面是一個實現這種查詢的例子:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

如你所見,這個查詢看上去只是一個普通的遠端過程呼叫(RPC),不過在後臺他是在一個 Storm 叢集中併發執行的。這種查詢的端到端延時一般在 10 ms 左右。當然,更大量的查詢會花費更長的時間,儘管這些查詢還是取決於你為這個計算過程分配了多少時間。

拓撲中的分散式查詢的實現是這樣的:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

這裡還需要使用前面的 TridentTopology 物件來建立一個 DRPC 資料流,這個建立資料流的方法叫做 “words”。前面使用DRPCClient 進行 RPC 呼叫的第一個引數必須與這個方法名完全相同。

在這段程式碼裡,首先是使用 Split 方法來將請求的引數分割成若干個單詞。這些單詞構成的單詞流是通過 “word” 域來分組的,而 stateQuery 運算子就是用來查詢拓撲中第一個部分中生成的 TridentState 物件的。stateQuery 接收一個 state(在這個例子中就是拓撲前面計算得到的單詞數結果)和查詢這個 state 的方法作為引數。在這個例子裡,stateQuery 呼叫了 MapGet 方法,用於獲取每個單詞的個數。由於 DRPC 資料流是和 TridentState 採用的完全相同的方式進行分組的(通過 “word” 域),每個單詞查詢都可以精確地定位到 TridentState 物件中的指定部分,同時 TridentState 物件中維護著對應的單詞的更新狀態。

接下來,個數為 0 的單詞會被 FilterNull 過濾器過濾掉,然後就可以使用 Sum 聚合器來獲取其他的單詞統計個數。接著 Trident 就會自動將結果返回給等待的客戶端。

Trident 很聰明,它知道怎麼以最好的效能執行拓撲。在這個拓撲中還有兩個會自動發生的有趣的事:

  1. 從 state 中讀取或寫入的操作(例如 persistentAggregate 和 stateQuery)會自動批處理化。因此,如果當前的批處理過程需要對資料庫執行 20 個更新操作,Trident 就會自動將讀取或寫入操作當作批處理過程,僅僅會對資料庫傳送一次讀請求和一次寫請求,而不是傳送 20 次讀請求和 20 次寫請求(而且一般你還可以在你的 state 裡使用快取來消除讀請求)。這樣做就有兩個方面的好處:可以按照你指定的方式來執行你的計算過程,同時還可以維持較好的效能。
  2. Trident 的聚合器是高度優化的。在向網路中傳送 tuple 之前,Trident 有時候會做部分聚合操作,而不是將一個分組的所有的 tuple 一股腦地傳送到同一臺機器中來執行聚合。例如,Count 聚合器就是這樣先計算每個小塊的個數,然後向網路中傳送很多個部分計數的結果,接著再將所有的部分計數結果彙總來得到最終的統計結果。這個技術與 MapReduce 的 combiner 模型很相似。

我們再來看看 Trident 的另一個例子。

Reach

這個例子是一個純粹的 DRPC 拓撲,計算了一個指定 URL 的 Reach 數。Reach 指的是 Twitter 上能夠看到一個指定的 URL 的獨立使用者數。要想計算 Reach,你需要先提取所有轉發了該 URL 的使用者,提取這些使用者的關注者,將關注者放入一個 set 集合中來去除重複的關注者,然後再統計這個 set 中的數量。對於單一的一臺機器來說,計算 reach 太耗時了,這個過程大概需要數千次資料庫呼叫並生成數千萬 tuple。而使用 Storm 和 Trident 就可以通過一個叢集來將計算過程的每個步驟進行並行化處理。

這個拓撲會從兩個 state 源中讀取資料。其中一個數據庫建立了 URL 和轉發了該 URL 的使用者列表的關聯表。另一個數據庫中建立了使用者和使用者的關注者列表的關聯表。拓撲的定義是這樣的:

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState());

topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
       .parallelismHint(200)
       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
       .groupBy(new Fields("follower"))
       .aggregate(new One(), new Fields("one"))
       .parallelismHint(20)
       .aggregate(new Count(), new Fields("reach"));

這個拓撲使用 newStaticState 方法建立了兩個分別對應外部於兩個外部資料庫的 TridentState 物件。在拓撲的後續部分就可以對這兩個 TridentState 物件執行查詢操作。和 state 的所有資料來源一樣,為了最大程度地提升效率,對這些資料庫的查詢將會自動地批處理化。

拓撲的定義很直接 —— 就是一個簡單的批處理 job。首先,會通過查詢 urlToTweeters 資料庫來獲取轉發了 URL 的使用者列表,然後就可以呼叫 ExpandList 方法來為每個 tweeter 建立一個 tuple。

接下來必須要獲取每個 tweeter 的關注者。由於需要呼叫 shuffle 方法將所有的 tweeter 均衡分配到拓撲的所有 worker 中,所以這個步驟必須併發進行,這一點非常重要。然後就可以查詢關注者資料庫來獲取每個 tweeter 的關注者列表。你可能注意到了這個過程的並行度非常高,因為這是整個計算過程中複雜度最高的部分。

再接下來,關注者就會被放入一個單獨的 set 集合中用於計數。這裡包含兩個步驟。首先,會根據 “follower” 域來執行 “group by” 分組操作,並在每個組上執行 One 聚合器。“One”聚合器的作用僅僅是為每個組傳送一個包含數字 1 的 tuple。然後,就可以通過統計這些 one 結果來得到關注者 set 的大小,也就是真正的關注者數量。下面是 “One” 聚合器的定義:

public class One implements CombinerAggregator<Integer> {
   public Integer init(TridentTuple tuple) {
       return 1;
   }

   public Integer combine(Integer val1, Integer val2) {
       return 1;
   }

   public Integer zero() {
       return 1;
   }        
}

這是一個“組合聚合器”,它知道怎樣在向網路中傳送 tuple 之前以最好的效率進行部分聚合操作。同樣,Sum 也是一個組合聚合器,所以在拓撲結尾的全域性統計操作也會有很高的效率。

下面讓我們再來看看 Trident 中的一些細節。

域(Fields)與元組(tuples)

Trident 的資料模型 TridentTuple 是一個指定的值列表。在一個拓撲中,tuple 是在一系列操作中不斷生成的。這些操作一般會輸入一個“輸入域”(input fields)集合,然後傳送出一個“方法域”(function fields)的集合。輸入域主要用於選取一個 tuple 的子集作為操作的輸入,而“方法域”主要用於為該操作的輸出結果域命名。

我們來看看這樣一個場景。假設你有一個名為 “stream” 的資料流,其中包含域 “x”、“y” 和 “z”。如果要執行一個接收 “y” 作為輸入的過濾器 MyFilter,你可以這樣寫:

stream.each(new Fields("y"), new MyFilter())

再假設 MyFilter 的實現是這樣的:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
       return tuple.getInteger(0) < 10;
   }
}

這樣就會保留所有 “y” 域的值小於 10 的 tuple。MyFilter 輸入的 TridentTuple 將會僅包含有 “y” 域。值得注意的是,Trident 可以在選取輸入域時以一種非常高效的方式來投射 tuple 的子集:這個投射過程非常靈活。

我們再來看看 “function fields” 是怎麼工作的。假設你有這樣一個函式:

public class AddAndMultiply extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       int i1 = tuple.getInteger(0);
       int i2 = tuple.getInteger(1);
       collector.emit(new Values(i1 + i2, i1 * i2));
   }
}

這個函式接收兩個數字作為輸入,然後傳送出兩個新值:分別是兩個數字的和和乘積。再假定你有一個包含 “x”、“y” 和 “z” 域的資料流,你可以這樣使用這個函式:

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));

這個函式的輸出增加了兩個新的域。因此,這個 each 呼叫的輸出 tuple 會包含 5 個域:“x”、“y” 、“z”、“added” 和 “multiplied”。其中 “added” 與 AddAndMultiply 的第一個輸出值相對應,“multiplied” 和 AddAndMultiply 的第二個輸出值相對應。

另一方面,通過聚合器,函式域也可以替換輸入 tuple 的域。假如你有一個包含域 “val1” 和域 “val2” 的資料流,通過這樣的操作:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

就會使得輸出資料流中只包含一個只帶有 “sum” 的域的 tuple,這個 “sum” 域就代表了在哪個批處理塊中所有的 “val2” 域的總和值。

通過資料流分組,輸出就可以同時包含用於分組的域以及由聚合器傳送的域。舉個例子:

stream.groupBy(new Fields("val1"))
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

這個操作就會使得輸出同時包含域 “val1” 以及域 “sum”。

State

實時計算的一個關鍵問題就在於如何管理狀態(state),使得在失敗與重試操作之後的更新過程仍然是冪等的。錯誤是不可消除的,所以在出現節點故障或者其他問題發生時批處理操作還需要進行重試。不過這裡最大的問題就在於怎樣執行一種合適的狀態更新操作(不管是針對外部資料庫還是拓撲內部的狀態),來使得每個訊息都能夠被執行且僅僅被執行一次。

這個問題很麻煩,接下來的例子裡面就有這樣的問題。假如你正在對你的資料流做一個計數聚合操作,並且打算將計數結果儲存到一個數據庫中。如果你僅僅把計數結果存到資料庫裡就完事了的話,那麼在你繼續準備更新某個塊的狀態的時候,你沒法知道到底這個狀態有沒有被更新過。這個資料塊有可能在更新資料庫的步驟上成功了,但在後續的步驟中失敗了,也有可能先失敗了,沒有進行更新資料庫的操作。你完全不知道到底發生了什麼。

Trident 通過下面兩件事情解決了這個問題:

  1. 在 Trident 中為每個資料塊標記了一個唯一的 id,這個 id 就叫做“事務 id”(transaction id)。如果資料塊由於失敗回滾了,那麼它持有的事務 id 不會改變。
  2. State 的更新操作是按照資料塊的順序進行的。也就是說,在成功執行完塊 2 的更新操作之前,不會執行塊 3 的更新操作。

基於這兩個基礎特性,你的 state 更新就可以實現恰好一次(exactly-once)的語義。與僅僅向資料庫中儲存計數不同,這裡你可以以一個原子操作的形式把事務 id 和計數值一起存入資料庫。在後續更新這個計數值的時候你就可以先比對這個資料塊的事務 id。如果比對結果是相同的,那麼就可以跳過更新操作 —— 由於 state 的強有序性,可以確定資料庫中已經包含有當前資料庫的額值。而如果比對結果不同,就可以放心地更新計數值了。

當然,你不需要在拓撲中手動進行這個操作,操作邏輯已經在 State 中封裝好了,這個過程會自動進行。同樣的,你的 State 物件也不一定要實現事務 id 標記:如果你不想在資料庫裡耗費空間儲存事務 id,你就不用那麼做。在這樣的情況下,State 會在出現失敗的情形下保持“至少處理一次”的操作語義(這樣對你的應用也是一件好事)。在這篇文章裡你可以瞭解到更多關於如何實現 State 以及各種容錯性權衡技術。

你可以使用任何一種你想要的方法來實現 state 的儲存操作。你可以把 state 存入外部資料庫,也可以儲存在記憶體中然後在存入 HDFS 中(有點像 HBase 的工作機制)。State 也並不需要一直儲存某個狀態值。比如,你可以實現一個只儲存過去幾個小時資料並將其餘的資料刪除的 State。這是一個實現 State 的例子:Memcached integration

Trident 拓撲的執行

Trident 拓撲會被編譯成一種儘可能和普通拓撲有著同樣的執行效率的形式。只有在請求資料的重新分配(比如 groupBy 或者 shuffle 操作)時 tuple 才會被髮送到網路中。因此,像下面這樣的 Trident 拓撲:

trident-topology

就會被編譯成若干個 spout/bolt:

trident-to-spout-and-bolt

總結

Trident 讓實時計算變得非常簡單。從上面的描述中,你已經看到了高吞吐量的資料流處理、狀態操作以及低延時查詢處理是怎樣通過 Trident 的 API 來實現無縫結合的。總而言之,Trident 可以讓你以一種更加自然,同時仍然保持著良好的效能的方式來實現實時計算。