1. 程式人生 > >Apache Storm 官方文件 —— Trident State

Apache Storm 官方文件 —— Trident State

Trident 中含有對狀態化(stateful)的資料來源進行讀取和寫入操作的一級抽象封裝工具。這個所謂的狀態(state)既可以儲存在拓撲內部(儲存在記憶體中並通過 HDFS 來實現備份),也可以存入像 Memcached 或者 Cassandra 這樣的外部資料庫中。而對於 Trident API 而言,這兩種機制並沒有任何區別。

 

Trident 使用一種容錯性的方式實現對 state 的管理,這樣,即使在發生操作失敗或者重試的情況下狀態的更新操作仍然是冪等的。基於這個機制,每條訊息都可以看作被恰好處理了一次,然後你就可以很容易地推斷出 Trident 拓撲的狀態。

State 的更新過程支援多級容錯性保證機制。在討論這一點之前,我們先來看一個例子,這個例子展示瞭如何實現恰好一次的語義的技術。假如你正在對資料流進行一個計數聚合操作,並打算將計數結果存入資料庫中。在這個例子裡,你存入資料庫的就是一個對應計數結果的值,每次處理新 tuple 的時候就會增加這個值。

考慮到可能存在的處理失敗情況,tuple 有可能需要重新處理。這樣就給 state 的更新操作帶來了一個問題(或者其他的副作用)—— 你無法知道當前的這個 tuple 的更新操作是否已經處理過了。也許你之前沒有處理過這個 tuple,那麼你現在就需要增加計數結果;也許你之前已經處理過 tuple 了並且成功地增加了計數結果,但是在後續操作過程中 tuple 的處理失敗了,並由此引發了 tuple 的重新處理操作,這時你就不能再增加計數結果了;還有可能你之前在使用這個 tuple 更新資料庫的時候出錯了,也就是說計數值的更新操作並未成功,此時在 tuple 的重新處理過程中你仍然需要更新資料庫。

所以說,如果只是向資料庫中簡單地存入計數值,你確實無法知道 tuple 是否已經被處理過。因此,你需要一些更多的資訊來做決定。Trident 提供了一種支援恰好一次處理的語義,如下所述:

  1. 通過小資料塊(batch)的方式來處理 tuple(可以參考Trident 教程一文)
  2. 為每個 batch 提供一個唯一的 id,這個 id 稱為 “事務 id”(transaction id,txid)。如果需要對 batch 重新處理,這個 batch 上仍然會賦上相同的 txid。
  3. State 的更新操作是按照 batch 的順序進行的。也就是說,在 batch 2 完成處理之前,batch 3 的狀態更新操作不會進行。

基於這幾個基本性質,你的 State 的實現就可以檢測到 tuple 的 batch 是否已經被處理過,並根據檢測結果選擇合適的 state 更新操作。你具體採用的操作取決於你的輸入 spout 提供的語義,這個語義對每個 batch 都是有效的。有三類支援容錯性的 spout:“非事務型”(non-transactional)、“事務型”(transactional)以及“模糊事務型”(opaque transactional)。接下來我們來分析下每種 spout 型別的容錯性語義。

事務型 spout(Transactional spouts)

記住一點,Trident 是通過小資料塊(batch)的方式來處理 tuple 的,而且每個 batch 都會有一個唯一的 txid。spout 的特性是由他們所提供的容錯性保證機制決定的,而且這種機制也會對每個 batch 發生作用。事務型 spout 包含以下特性:

  1. 每個 batch 的 txid 永遠不會改變。對於某個特定的 txid,batch 在執行重新處理操作時所處理的 tuple 集和它的第一次處理操作完全相同。
  2. 不同 batch 中的 tuple 不會出現重複的情況(某個 tuple 只會出現在一個 batch 中,而不會同時出現在多個 batch 中)。
  3. 每個 tuple 都會放入一個 batch 中(處理操作不會遺漏任何的 tuple)。

這是一種很容易理解的 spout,其中的資料流會被分解到固定的 batches 中。Storm-contrib 專案中提供了一種基於 Kafka 的事務型 spout 實現

看到這裡,你可能會有這樣的疑問:為什麼不在拓撲中完全使用事務型 spout 呢?這個原因很好理解。一方面,有些時候事務型 spout 並不能提供足夠可靠的容錯性保障,所以不需要使用事務型 spout。比如,TransactionalTridentKafkaSpout的工作方式就是使得帶有某個 txid 的 batch 中包含有來自一個 Kafka topic 的所有 partition 的 tuple。一旦一個 batch 被髮送出去,在將來無論重新發送這個 batch 多少次,batch 中都會包含有完全相同的 tuple 集,這是由事務型 spout 的語義決定的。現在假設 TransactionalTridentKafkaSpout 傳送出的某個 batch 處理失敗了,而與此同時,Kafka 的某個節點因為故障下線了。這時你就無法重新處理之前的 batch 了(因為 Kafka 的節點故障,Kafka topic 必然有一部分 partition 無法獲取到),這個處理過程也會因此終止。

這就是要有“模糊事務型” spout 的原因了 —— 模糊事務型 spout 支援在資料來源節點丟失的情況下仍然可以實現恰好一次的處理語義。我們會在下一節討論這類 spout。

順便提一點,如果 Kafka 支援資料複製,那麼就可以放心地使用事務型 spout 提供的容錯性機制了,因為這種情況下某個節點的故障不會導致資料丟失,不過 Kafka 暫時還不支援該特性。(本文的寫作時間應該較早,Kakfa 早就已經可以支援複製的機制了 —— 譯者注)。

在討論“模糊事務型” spout 之前,讓我們先來看看如何為事務型 spout 設計一種支援恰好一次語義的 State。這個 State 就稱為 “事務型 state”,它支援對於特定的 txid 永遠只與同一組 tuple 相關聯的特性。

假如你的拓撲需要計算單詞數,而且你準備將計數結果存入一個 K-V 型資料庫中。這裡的 key 就是單詞,value 對應於單詞數。從上面的討論中你應該已經明白了僅僅儲存計數結果是無法確定某個 batch 中的tuple 是否已經被處理過的。所以,現在你應該將 txid 作為一種原子化的值與計數值一起存入資料庫。隨後,在更新計數值的時候,你就可以將資料庫中的 txid 與當前處理的 batch 的 txid 進行比對。如果兩者相同,你就可以跳過更新操作 —— 由於 Trident 的強有序性處理機制,可以確定資料庫中的值是對應於當前的 batch 的。如果兩者不同,你就可以放心地增加計數值。由於一個 batch 的 txid 永遠不會改變,而且 Trident 能夠保證 state 的更新操作完全是按照 batch 的順序進行的,所以,這樣的處理邏輯是完全可行的。

下面來看一個例子。假如你正在處理 txid 3,其中包含有以下幾個 tuple:

["man"]
["man"]
["dog"]

假如資料庫中有以下幾個 key-value 對:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

其中與 “man” 相關聯的 txid 為 1。由於當前處理的 txid 為 3,你就可以確定當前處理的 batch 與資料庫中儲存的值無關,這樣你就可以放心地將 “man” 的計數值加上 2 並更新 txid 為 3。另一方面,由於 “dog” 的 txid 與當前的 txid 相同,所以,“dog” 的計數是之前已經處理過的,現在不能再對資料庫中的計數值進行更新操作。這樣,在結束 txid3 的更新操作之後,資料庫中的結果就會變成這樣:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

現在我們再來討論一下“模糊事務型” spout。

模糊事務型 spout(Opaque transactional spouts)

前面已經提到過,模糊事務型 spout 不能保證一個 txid 對應的 batch 中包含的 tuple 完全一致。模糊事務型 spout 有以下的特性:

  1. 每個 tuple 都會通過某個 batch 處理完成。不過,在 tuple 處理失敗的時候,tuple 有可能繼續在另一個 batch 中完成處理,而不一定是在原先的 batch 中完成處理。

OpaqueTridentKafkaSpout 就具有這樣的特性,同時它對 Kafka 節點的丟失問題具有很好的容錯性。OpaqueTridentKafkaSpout 在傳送一個 batch 的時候總會總上一個 batch 結束的地方開始傳送新 tuple。這一點可以保證 tuple 不會被遺漏,而且也不會被多個 batch 處理。

不過,模糊事務型 spout 的缺點就在於不能通過 txid 來識別資料庫中的 state 是否是已經處理過的。這是因為在 state 的更新的過程中,batch 有可能會發生變化。

在這種情況下,你應該在資料庫中儲存更多的 state 資訊。除了一個結果值和 txid 之外,你還應該存入前一個結果值。我們再以上面的計數值的例子來分析以下這個問題。假如你的 batch 的部分計數值是 “2”,現在你需要應用一個更新操作。假定現在資料庫中的值是這樣的:

{ value = 4,
  prevValue = 1,
  txid = 2
}
  • 情形1:假如當前處理的 txid 為 3,這與資料庫中的 txid 不同。這時可以將 “prevValue” 的值設為 “value” 的值,再為 “value” 的值加上部分計數的結果並更新 txid。執行完這一系列操作之後的資料庫中的值就會變成這樣:
{ value = 6,
  prevValue = 4,
  txid = 3
}
  • 情形2:如果當前處理的 txid 為 2,也就是和資料庫中儲存的 txid 一致,這種情況下的處理邏輯與上面的 txid 不一致的情況又有所不同。因為此時你會知道資料庫中的更新操作是由上一個擁有相同 txid 的batch 做出的。不過那個 batch 有可能與當前的 batch 並不相同,所以你需要忽略它的操作。這個時候,你應該將 “prevValue” 加上 batch 中的部分計數值來計算新的 “value”。在這個操作之後資料庫中的值就會變成這樣:
{ value = 3,
  prevValue = 1,
  txid = 2
}

這種方法之所以可行是因為 Trident 具有強順序性處理的特性。一旦 Trident 開始處理一個新的 batch 的狀態更新操作,它永遠不會回到過去的 batch 的處理上。同時,由於模糊事務型 spout 會保證 batch 之間不會存在重複 —— 每個 tuple 只會被某一個 batch 完成處理 —— 所以你可以放心地使用 prevValue 來更新 value。

非事務型 spout(Non-transactional spouts)

非事務型 spout 不能為 batch 提供任何的安全性保證。非事務型 spout 有可能提供一種“至多一次”的處理模型,在這種情況下 batch 處理失敗後 tuple 並不會重新處理;也有可能提供一種“至少一次”的處理模型,在這種情況下可能會有多個 batch 分別處理某個 tuple。總之,此類 spout 不能提供“恰好一次”的語義。

不同型別的 Spout 與 State 的總結

下圖顯示了不同的 spout/state 的組合是否支援恰好一次的訊息處理語義:

spout-state

模糊事務型 state 具有最好的容錯性特徵,不過這是以在資料庫中儲存更多的內容為代價的(一個 txid 和兩個 value)。事務型 state 要求的儲存空間相對較小,但是它的缺點是隻對事務型 spout 有效。相對的,非事務型要求的儲存空間最少,但是它也不能提供任何的恰好一次的訊息執行語義。

你選擇 state 與 spout 的時候必須在容錯性與儲存空間佔用之間權衡。可以根據你的應用的需求來確定哪種組合最適合你。

State API

從上文的描述中你已經瞭解到了恰好一次的訊息執行語義的原理是多麼的複雜。不過作為使用者你並不需要處理這些複雜的 txid 比對、多值儲存等操作,Trident 已經在 State 中封裝了所有的容錯性處理邏輯,你只需要像下面這樣寫程式碼即可:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))                
        .parallelismHint(6);

所有處理模糊事務型 state 的邏輯已經封裝在 MemcachedState.opaque 的呼叫中了。另外,狀態更新都會自動調整為批處理操作,這樣可以減小與資料庫的反覆互動的資源損耗。

基本的 State 介面只有兩個方法:

public interface State {
    void beginCommit(Long txid); // 對於類似於在 DRPC 流上進行 partitionPersist 的操作,此方法可以為空
    void commit(Long txid);
}

前面已經說過,state 更新操作的開始時和結束時都會獲取一個 txid。對於你的 state 怎麼工作,你在其中使用什麼樣的方法執行更新操作,或者使用什麼樣的方法從 state 中讀取資料,Trident 並不關心。

假如你有一個包含有使用者的地址資訊的定製資料庫,你需要使用 Trident 與該資料庫互動。你的 State 的實現就會包含有用於獲取與設定使用者資訊的方法,比如下面這樣:

public class LocationDB implements State {
    public void beginCommit(Long txid) {    
    }

    public void commit(Long txid) {    
    }

    public void setLocation(long userId, String location) {
      // code to access database and set location
    }

    public String getLocation(long userId) {
      // code to get location from database
    }
}

接著你就可以為 Trident 提供一個 StateFactory 來建立 Trident 任務內部的 State 物件的例項。對應於你的資料庫(LocationDB)的 StateFactory 大概是這樣的:

public class LocationDBFactory implements StateFactory {
   public State makeState(Map conf, int partitionIndex, int numPartitions) {
      return new LocationDB();
   } 
}

Trident 提供了一個用於查詢 state 資料來源的 QueryFunction 介面,以及一個用於更新 state 資料來源的 StateUpdater 介面。例如,我們可以寫一個查詢 LocationDB 中的使用者地址資訊的 “QueryLocation”。讓我們從你在拓撲中使用這個操作的方式開始。假如在拓撲中需要讀取輸入流中的 userid 資訊:

TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))

這裡的 QueryLocation 的實現可能是這樣的:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for(TridentTuple input: inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }    
}

QueryFunction 的執行包含兩個步驟。首先,Trident 會將讀取的一些資料中彙總為一個 batch 傳入 batchRetrieve 方法中。在這個例子中,batchRetrieve 方法會收到一些使用者 id。然後 batchRetrieve 會返回一個與輸入 tuple 列表大小相同的佇列。結果佇列的第一個元素與第一個輸入 tuple 對應,第二個元素與第二個輸入 tuple 相對應,以此類推。

你會發現這段程式碼並沒有發揮出 Trident 批處理的優勢,因為這段程式碼僅僅一次查詢一下 LocationDB。所以,實現 LocationDB 的更好的方式應該是這樣的:

public class LocationDB implements State {
    public void beginCommit(Long txid) {    
    }

    public void commit(Long txid) {    
    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
      // set locations in bulk
    }

    public List<String> bulkGetLocations(List<Long> userIds) {
      // get locations in bulk
    }
}

然後,你可以這樣實現 QueryLocation 方法:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for(TridentTuple input: inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }    
}

這段程式碼大幅減少了域資料庫的IO,具有更高的執行效率。

你需要使用 StateUpdater 介面來更新 state。下面是一個更新 LocationDB 的地址資訊的 StateUpdater 實現:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for(TridentTuple t: tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}

然後你就可以在 Trident 拓撲中這樣使用這個操作:

TridentTopology topology = new TridentTopology();
TridentState locations = 
    topology.newStream("locations", locationsSpout)
        .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())

partitionPersist 操作會更新 state 資料來源。StateUpdater 接收 State 和一批 tuple 作為輸入,然後更新這個 State。上面的程式碼僅僅從輸入 tuple 中抓取 userid 和 location 資訊,然後對 State 執行一個批處理更新操作。

在 Trident 拓撲更新 LocationDB 之後,partitionPersist 會返回一個表示更新後狀態的 TridentState 物件。隨後你就可以在拓撲的其他地方使用 stateQuery 方法對這個 state 執行查詢操作。

你也許注意到了 StateUpdater 中有一個 TridentCollector 引數。傳送到這個 collector 的 tuple 會進入一個“新的數值流”中。在這個例子裡向這個新的流傳送 tuple 並沒有意義,不過如果你需要處理類似於更新資料庫中的計數值這樣的操作,你可以考慮將更新後的技術結果傳送到這個流中。可以通過 TridentState.newValuesStream 方法來獲取新的流的資料。

persistentAggregate

Trident 使用一個稱為 persistentAggregate 的方法來更新 State。你已經在前面的資料流單詞統計的例子裡見過了這個方法,這裡再寫一遍:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

partitionPersist 是一個接收 Trident 聚合器作為引數並對 state 資料來源進行更新的方法,persistentAggregate 就是構建於 partitionPersist 上層的一個程式設計抽象。在這個例子裡,由於是一個分組資料流(grouped stream),Trident 需要你提供一個實現 MapState 介面的 state。被分組的域就是 state 中的 key,而聚合的結果就是 state 中的 value。MapState 介面是這樣的:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

而當你在非分組資料流上執行聚合操作時(全域性聚合操作),Trident 需要你提供一個實現了 Snapshottable 介面的物件:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}

MemoryMapState 與 MemcachedState 都實現了上面兩個介面。

實現 Map State 介面

實現 MapState 介面非常簡單,Trident 幾乎已經為你做好了所有的準備工作。OpaqueMapTransactionalMap、與NonTransactionalMap 類都分別實現了各自的容錯性語義。你只需要為這些類提供一個用於對不同的 key/value 進行 multiGets 與 multiPuts 處理的 IBackingMap 實現類。IBackingMap 介面是這樣的:

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys); 
    void multiPut(List<List<Object>> keys, List<T> vals); 
}

OpaqueMap 會使用 OpaqueValue 作為 vals 引數來呼叫 multiPut 方法,TransactionalMap 會使用 TransactionalValue 作為引數,而 NonTransactionalMap 則直接將拓撲中的物件傳入。

Trident 也提供了一個 CachedMap 用於實現 K-V map 的自動 LRU 快取功能。

最後,Trident 還提供了一個 SnapshottableMap 類,該類通過將全域性聚合結果存入一個固定的 key 中的方法將 MapState 物件轉化為一個 Snapshottable 物件。

可以參考 MemcachedState 的實現來了解如何將這些工具結合到一起來提供一個高效能的 MapState。MemcachedState 支援選擇模糊事務型、事務型或者非事務型語義。

原創文章,轉載請註明: 轉載自併發程式設計網 – ifeve.com本文連結地址: Apache Storm 官方文件 —— Trident State