1. 程式人生 > >Flink 專題 -2 Checkpoint、Savepoint 機制

Flink 專題 -2 Checkpoint、Savepoint 機制

CheckPoint

1. checkpoint 保留策略

預設情況下,checkpoint 不會被保留,取消程式時即會刪除他們,但是可以通過配置保留定期檢查點,根據配置 當作業失敗或者取消的時候 ,不會自動清除這些保留的檢查點 。
java :

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 可選項如下:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業時保留檢查點。請注意,在這種情況下,您必須在取消後手動清理檢查點狀態。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業時刪除檢查點。只有在作業失敗時,檢查點狀態才可用。

2. Checkpoint 配置

與SavePoint 類似 ,checkpoint 保留的是元資料檔案和一些資料檔案
預設情況下checkpoint 只保留 一份最新資料,如果需要進行checkpoint資料恢復 ,可以通過全域性設定的方式設定該叢集預設的checkpoint 保留數,以保證後期可以從checkpoint 點進行恢復 。 同時為了 及時儲存checkpoint狀態 還需要在服務級別設定 checkpoint 檢查點的 備份速度 。
全域性配置:
flink-conf.yaml

// 設定 checkpoint全域性設定儲存點  
state.checkpoints.dir: hdfs:///checkpoints/
// 設定checkpoint 預設保留 數量  
state.checkpoints.num-retained: 20

注意 如果將 checkpoint儲存在hdfs 系統中 , 需要設定 hdfs 元資料資訊 : fs.default-scheme:
服務級別設定:
java:

// 設定 checkpoint 儲存目錄  
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
// 設定checkpoint 檢查點間隔時間  
env.enableCheckpointing(5000);

提交任務之後 job 介面 和 hdfs 介面

  • 通過頁面可以看出 checkpoint 備份方式是每5秒執行一次 ,儲存當前所有task 狀態元資訊 和狀態資訊 。
  • hdfs 資訊 儲存 jobId 為 0171897fa809692093b4a9b223cb35e4 最新的 20次 checkpoint 資訊


3. Checkpoint 狀態點恢復

因為 flink checkpoint 目錄 分別對應的是 jobId , 每通過 flink run 方式 / 頁面提交方式 都會重新生成 jobId ,那麼如何通過checkpoint 恢復 失敗任務或者重新執行保留時間點的 任務?

flink 提供了 在啟動 之時 通過設定 -s 引數指定checkpoint 目錄 , 讓新的jobId 讀取該checkpoint 元檔案資訊和狀態資訊 ,從而達到指定時間節點啟動 job 。啟動方式如下 :

./bin/flink -s /flink/checkpoints/0171897fa809692093b4a9b223cb35e4/chk-50/_metadata  -p  @Parallelism -c @Mainclass @jar  

Savepoint

Savepoint 介紹

Savepoint是通過Flink的檢查點機制建立的流作業執行狀態的一致影象。您可以使用Savepoints來停止和恢復,分叉或更新Flink作業。儲存點由兩部分組成:穩定儲存(例如HDFS,S3,...)上的(通常是大的)二進位制檔案和(相對較小的)元資料檔案的目錄。穩定儲存上的檔案表示作業執行狀態影象的淨資料。Savepoint的元資料檔案以(絕對路徑)的形式包含(主要)指向作為Savepoint一部分的穩定儲存上的所有檔案的指標。

savepoint 和 checkpoint 區別

從概念上講,Flink的Savepoints與Checkpoints的不同之處在於備份與傳統資料庫系統中的恢復日誌不同。檢查點的主要目的是在意外的作業失敗時提供恢復機制。Checkpoint的生命週期由Flink管理,即Flink建立,擁有和釋出Checkpoint - 無需使用者互動。作為一種恢復和定期觸發的方法,Checkpoint實現的兩個主要設計目標是:i)being as lightweight to create (輕量級),ii)fast restore (快速恢復) 。針對這些目標的優化可以利用某些屬性,例如,JobCode在執行嘗試之間不會改變。

與此相反,Savepoints由使用者建立,擁有和刪除。他們的用例是planned (計劃) 的,manual backup( 手動備份 ) 和 resume(恢復) 。例如,這可能是您的Flink版本的更新,更改您的Job graph ,更改 parallelism ,分配第二個作業,如紅色/藍色部署,等等。當然,Savepoints必須在終止工作後繼續存在。從概念上講,儲存點的生成和恢復成本可能更高,並且更多地關注可移植性和對前面提到的作業更改的支援。

Assigning Operator IDs ( 分配 operator ids)

為了能夠在將來升級你的程式在本節中描述。主要的必要更改是通過該uid(String)方法手動指定操作員ID 。這些ID用於確定每個運算子的狀態。
java:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

如果您未手動指定ID,則會自動生成這些ID。只要這些ID不變,您就可以從儲存點自動恢復。生成的ID取決於程式的結構,並且對程式更改很敏感。因此,強烈建議手動分配這些ID。

Savepoint State

觸發儲存點時,會建立一個新的儲存點目錄,其中將儲存資料和元資料。可以通過配置預設目標目錄或使用觸發器命令指定自定義目標目錄來控制此目錄的位置

儲存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]
這將觸發具有ID的作業的儲存點:jobId,並返回建立的儲存點的路徑。您需要此路徑來還原和部署儲存點。

在yarn 叢集中儲存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
這將觸發具有ID :jobId和YARN應用程式ID 的作業的儲存點:yarnAppId,並返回建立的儲存點的路徑。

使用 Savepoint 取消job

$ bin/flink cancel -s [:targetDirectory] :jobId
這將以原子方式觸發具有ID的作業的儲存點:jobid並取消作業。此外,您可以指定目標檔案系統目錄以儲存儲存點。該目錄需要可由JobManager和TaskManager訪問。

Resuming Savepoint

$ bin/flink run -s :savepointPath [:runArgs]
這將提交作業並指定要從中恢復的儲存點。您可以指定儲存點目錄或_metadata檔案的路徑。

允許未恢復狀態啟動

$ bin/flink run -s :savepointPath -n [:runArgs]
預設情況下,resume操作將嘗試將儲存點的所有狀態映射回要恢復的程式。如果刪除了運算子,則可以通過--allowNonRestoredState(short -n:)選項跳過無法對映到新程式的狀態:

全域性配置

您可以通過state.savepoints.dir 配置檔案設定預設savepoint 位置 。觸發儲存點時,此目錄將用於儲存儲存點。您可以通過使用觸發器命令指定自定義目標目錄來覆蓋預設值(請參閱:targetDirectory引數)。
flink-conf.yaml

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

參考地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
http://ju.outofmemory.cn/entry/370841