1. 程式人生 > >kafka:過期資料清理

kafka:過期資料清理

Kafka將資料持久化到了硬碟上,允許你配置一定的策略對資料清理,清理的策略有兩個,刪除和壓縮。

資料清理的方式

刪除

log.cleanup.policy=delete啟用刪除策略 直接刪除,刪除後的訊息不可恢復。可配置以下兩個策略: 清理超過指定時間清理:  log.retention.hours=16 超過指定大小後,刪除舊的訊息:log.retention.bytes=1073741824 為了避免在刪除時阻塞讀操作,採用了copy-on-write形式的實現,刪除操作進行時,讀取操作的二分查詢功能實際是在一個靜態的快照副本上進行的,這類似於Java的CopyOnWriteArrayList。

壓縮

將資料壓縮,只保留每個key最後一個版本的資料。 首先在broker的配置中設定log.cleaner.enable=true啟用cleaner,這個預設是關閉的。 在topic的配置中設定log.cleanup.policy=compact啟用壓縮策略。

如上圖,在整個資料流中,每個Key都有可能出現多次,壓縮時將根據Key將訊息聚合,只保留最後一次出現時的資料。這樣,無論什麼時候消費訊息,都能拿到每個Key的最新版本的資料。 壓縮後的offset可能是不連續的,比如上圖中沒有5和7,因為這些offset的訊息被merge了,當從這些offset消費訊息時,將會拿到比這個offset大的offset對應的訊息,比如,當試圖獲取offset為5的訊息時,實際上會拿到offset為6的訊息,並從這個位置開始消費。 這種策略只適合特殊場景,比如訊息的key是使用者ID,訊息體是使用者的資料,通過這種壓縮策略,整個訊息集裡就儲存了所有使用者最新的資料。 壓縮策略支援刪除,當某個Key的最新版本的訊息沒有內容時,這個Key將被刪除,這也符合以上邏輯。