1. 程式人生 > >Flink 1.8.0中的狀態生存時間特性:如何自動清理應用程式的狀態

Flink 1.8.0中的狀態生存時間特性:如何自動清理應用程式的狀態

對於許多狀態流式計算程式來說,一個常見的需求是自動清理應用程式的狀態(state),以便有效地控制狀態大小,或者控制程式訪問狀態的有效時間(例如受限於諸如GDPR等法律條規)。Apache Flink自1.6.0版本引入了狀態的生存時間(time-to-live,TTL)功能,使得應用程式的狀態清理和有效的狀態大小管理成為可能。

在本文中,我們將討論引入狀態生存時間特性的動機並討論其相關用例。此外,我們還將演示如何使用和配置該特性。同時,我們將會解釋Flink如何借用狀態生存時間特性在內部管理狀態,並對Flink 1.8.0中該功能引入的相關新特性進行一些展示。本文章最後對未來的改進和擴充套件作了展望。

狀態的暫時性

有兩個主要原因可以解釋為什麼狀態只應該維持有限的時間。讓我們先設想一個Flink應用程式,它接收使用者登入事件流,併為每個使用者儲存上一次登入時的相關事件資訊和時間戳,以改善高頻訪問使用者的體驗。

  • 控制狀態的大小。 狀態生存時間特性的主要使用場景,就是能夠有效地管理不斷增長的狀態大小。通常情況下,資料只需要暫時儲存,例如使用者處在一次網路連線會話中。當用戶訪問事件結束時,我們實際上就沒有必要儲存該使用者的狀態,來減少無謂的狀態儲存空間佔用。Flink 1.8.0引入的基於生存時間的後臺狀態清理機制,使得我們能夠自動地對無用資料進行清理。此前,應用程式開發人員必須採取額外的操作並顯式地刪除無用狀態以釋放儲存空間。這種手動清理過程不僅容易出錯,而且效率低下。以我們上述使用者登入的案例為例,因為這些不活躍使用者的相關資訊會被自動過期清理掉,我們就不再需要額外儲存上次登入的時間戳。

  • 符合(敏感)資料保護的要求。 隨著資料隱私法規的發展(例如歐盟頒佈的通用資料保護法規GDPR),遵守此類法規的相關要求,或將資料進行敏感處理已經成為許多應用程式的首要任務。此類使用場景的的一個典型案例,就需要僅在特定時間段內儲存資料並防止其後可以再次訪問該資料。這對於為客戶提供短期服務的公司來說是一個常見的挑戰。狀態生存時間這一特性,就保證了應用程式僅可以在有限時間內進行訪問,有助於遵守資料保護法規。

這兩個需求都可以通過狀態生存時間來解決,這個功能可以週期性地、持續地刪除狀態中的鍵值,一旦它變得不必要或不重要,並且不再需要儲存在儲存中時。

對應用狀態的持續清理

Apache Flink的1.6.0版本引入了狀態生存時間特性。它使流處理應用程式的開發人員能夠配置運算元的狀態,使其在定義的超時(生存時間)後過期並被清除。在Flink 1.8.0中,該功能得到了進一步擴充套件,對RocksDB和堆記憶體狀態後端(FsStateBackendMemoryStateBackend)的舊資料進行連續性的清理。
在Flink的DataStream API中,應用程式狀態是由狀態描述符(state descriptor)來定義的。狀態生存時間是通過將StateTtlConfiguration物件傳遞給狀態描述符來配置的。下面的Java示例演示瞭如何建立狀態生存時間的配置,並將其提供給狀態描述符,該狀態描述符將使用者的上次登入時間儲存為Long值:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<Long> lastUserLogin =
    new ValueStateDescriptor<>("lastUserLogin", Long.class);

lastUserLogin.enableTimeToLive(ttlConfig);

Flink提供了多個選項來配置狀態生存時間的行為:

  • 什麼時候重置生存時間? 預設情況下,當狀態被修改時,生存時間就會被更新。我們也可以在讀操作訪問狀態時更新相關項的生存時間,但這樣要花費額外的寫操作來更新時間戳。
  • 已經過期的資料是否可以訪問? 狀態生存時間機制使用的是惰性策略來清除過期狀態。這可能導致應用程式會嘗試讀取過期但尚未刪除的狀態。使用者可以配置對這樣的讀取請求是否返回過期狀態。無論哪種情況,過期狀態都會在之後立即被刪除。雖然返回已經過期的狀態有利於資料可用性,但不返回過期狀態更符合相關資料保護法規的要求。
  • 哪種時間語義被用於定義生存時間? 在Apache Flink 1.8.0中,使用者只能根據處理時間(Processing Time)定義狀態生存時間。未來的Flink版本中計劃支援事件時間(Event Time)。

關於狀態生存時間的更多資訊,可以參考Flink官方文件

在實現上,狀態生存時間特性會額外儲存上一次相關狀態訪問的時間戳。雖然這種方法增加了一些儲存開銷,但它允許Flink在訪問狀態、建立檢查點、恢復或儲存清理過程時可以檢查過期狀態。

“取走垃圾資料”

在訪問狀態物件時,Flink將檢查其時間戳,並在狀態過期時清除狀態(是否返回過期狀態,則取決於配置的過期資料可見性)。由於這種訪問時才刪除的特性,除非被垃圾回收,否則那些永遠不被訪問過期資料將仍然佔用儲存空間。
那麼,在沒有顯示處理過期狀態的情況下,如何刪除這些資料呢?通常,我們可以配置不同的策略進行後臺刪除。

保證完整快照中不包含過期資料

Flink 1.6.0已經支援在建立檢查點(checkpoint)或儲存點(savepoint)的完整快照時不包含過期狀態。需要注意的是,建立增量快照時並不支援剔除過期狀態。完整快照時的過期狀態剔除必須如下例所示進行顯示啟用:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupFullSnapshot()
    .build();

上述配置並不會影響本地狀態儲存的大小,但是整個作業的完整快照的大小將會減小。只有當用戶從快照重新載入其狀態到本地時,才會清除使用者的本地狀態。

由於上述這些限制,在Flink 1.6.0中程式仍需要過期後主動刪除狀態。為了改善使用者體驗,Flink1.8.0引入了兩種自主清理策略,分別針對兩種狀態後端型別:

堆記憶體狀態後端的增量清理

此方法只適用於堆記憶體狀態後端(FsStateBackendMemoryStateBackend)。其基本思路是在儲存後端的所有狀態條目上維護一個全域性的惰性迭代器。某些事件(例如狀態訪問)會觸發增量清理,而每次觸發增量清理時,迭代器都會向前遍歷刪除已遍歷的過期資料。以下程式碼示例展示瞭如何啟用增量清理:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    // check 10 keys for every state access
    .cleanupIncrementally(10, false)
    .build();

如果啟用該功能,則每次狀態訪問都會觸發清除。而每次清理時,都會檢查一定數量的狀態條目是否過期。其中有兩個調整引數。第一個定義了每次清理時要檢查的狀態條目數。第二個引數是一個標誌位,用於表示是否在每條記錄處理(record processed)之後(而不僅僅是訪問狀態,state accessed),都還額外觸發清除邏輯。
關於這種方法有兩個重要的注意事項:首先是增量清理所花費的時間會增加記錄處理的延遲。其次,如果沒有狀態被訪問(state accessed)或者沒有記錄被處理(record processed),過期的狀態也將不會被刪除。

RocksDB狀態後端利用後臺壓縮來清理過期狀態

如果使用RocksDB狀態後端,則可以啟用另一種清理策略,該策略基於Flink定製的RocksDB壓縮過濾器(compaction filter)。RocksDB會定期執行非同步的壓縮流程以合併資料並減少相關儲存的資料量,該定製的壓縮過濾器使用生存時間檢查狀態條目的過期時間戳,並丟棄所有過期值。
使用此功能的第一步,需要設定以下配置選項:state.backend.rocksdb.ttl.compaction.filter.enabled。一旦配置使用RocksDB狀態後端後,如以下程式碼示例將會啟用壓縮清理策略:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .cleanupInRocksdbCompactFilter()
    .build();

需要注意的是啟用Flink的生存時間壓縮過濾機制後,會放緩RocksDB的壓縮速度。

使用定時器進行狀態清理

另一種手動清除狀態的方法是基於Flink的計時器,這也是社群評估的一個想法。使用這種方法,將為每個狀態訪問註冊一個清除計時器。這種方法的清理更加精準,因為狀態一旦過期就會被立刻刪除。但是由於計時器會與原始狀態一起儲存會消耗空間,開銷也更大一些。

未來展望

除了上面提到的基於計時器的清理策略之外,Flink社群還計劃進一步改進狀態生存時間特性。可能的改進包括為事件時間(event time)新增生存時間的支援(目前只支援處理時間)和為可查詢狀態(queryable state)啟用狀態生存時間機制。

總結

狀態可訪問時間的限制和應用程式狀態大小的控制,是狀態流處理領域的常見挑戰,Flink的1.8.0版本通過新增對過期狀態物件連續性後臺清理的支援,顯著改進了狀態生存時間特性。新的清理機制可以不再需要手動實現狀態清理的工作,而且由於惰性清理的機制,執行效率也更高。總得來說,狀態生存時間方便使用者控制應用程式狀態的大小,使得使用者可以將精力集中在應用程式的核心邏輯開發上。

原文連結:https://flink.apache.org/2019/05/19/state-ttl.html

 


本文作者:Ververica

原文連結

本文為雲棲社群原創內容,未經