1. 程式人生 > >原始碼分析RocketMQ檔案清除機制

原始碼分析RocketMQ檔案清除機制

1、由於RocketMQ操作CommitLog、ConsumeQueue檔案,都是基於記憶體對映方法並在啟動的時候,會載入commitlog、ConsumeQueue目錄下的所有檔案,為了避免記憶體與磁碟的浪費,不可能將訊息永久儲存在訊息伺服器上,所以需要一種機制來刪除已過期的檔案。RocketMQ順序寫Commitlog、ConsumeQueue檔案,所有寫操作全部落在最後一個CommitLog或ConsumeQueue檔案上,之前的檔案在下一個檔案建立後,將不會再被更新,RocketMQ清除過期檔案的方法是:如果非當前寫檔案在一定時間間隔內沒有再次被更新,則認為是過期檔案,可以被刪除,RocketMQ不會管這個這個檔案上的訊息是否被全部消費。預設每個檔案的過期時間為72小時。通過在Broker配置檔案中設定fileReservedTime來改變過期時間,單位為小時。接下來詳細分析RocketMQ是如何設計與實現上述機制的。
DefaultMessageStore#addScheduleTask:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

RocketMQ會每隔10s排程一次cleanFilesPeriodically,已檢測是否需要清除過期檔案。執行頻率可以通過設定cleanResourceInterval,預設為10s。
DefaultMessageStore#cleanFilesPeriodically

private void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }

主要清除CommitLog、ConsumeQueue的過期檔案。
CommitLog與ConsumeQueue對於過期檔案的刪除演算法、邏輯大同小異,本文將以CommitLog過期檔案為例來詳細分析其實現原理。

DefaultMessageStore$CleanCommitLogService#run

public
void run() { try { this.deleteExpiredFiles(); this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } }

整個執行過程分為兩個大的步驟,第一個步驟:嘗試刪除過期檔案;第二個步驟:重試刪除被hange(由於被其他執行緒引用在第一階段未刪除的檔案),在這裡再重試一次。
DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles

long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

Step1:解釋一下這個三個配置屬性的含義。
fileReservedTime:檔案保留時間,也就是從最後一次更新時間到現在,如果超過了該時間,則認為是過期檔案,可以被刪除。
deletePhysicFilesInterval:刪除物理檔案的間隔,因為在一次清除過程中,可能需要刪除的檔案不止一個,該值指定兩次刪除檔案的間隔時間。
destroyMapedFileIntervalForcibly:在清除過期檔案時,如果該檔案被其他執行緒所佔用(引用次數大於0,比如讀取訊息),此時會阻止此次刪除任務,
同時在第一次試圖刪除該檔案時記錄當前時間戳,destroyMapedFileIntervalForcibly表示第一次拒絕刪除之後能保留的最大時間,在此時間內,同樣可以被拒絕刪除,同時會將引用減少1000個,超過該時間間隔後,檔案將被強制刪除。
DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles:

boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
    //繼續執行刪除邏輯
   return;
} else {
   // 本次刪除任務無作為。
}

Step2:RocketMQ在如下三種情況任意滿足之一的情況下將繼續執行刪除檔案操作。
1)到了刪除檔案的時間點,RocketMQ通過deleteWhen設定一天的固定時間執行一次刪除過期檔案操作,預設為凌晨4點。
2)判斷磁碟空間是否充足,如果不充足,則返回true,表示應該觸發過期檔案刪除操作。
3)預留,手工觸發,可以通過呼叫excuteDeleteFilesManualy方法手工觸發過期檔案刪除,目前RocketMQ暫未封裝手工觸發檔案刪除的命令。
重點分析一下磁碟不足的判斷依據。
DefaultMessageStore$CleanCommitLogService#isSpaceToDelete

double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;   // @1
cleanImmediately = false;
{
    String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);   // @2
    if (physicRatio > diskSpaceWarningLevelRatio) {  // @3
           boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
           if (diskok) {
                 DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
           }
             cleanImmediately = true;
     } else if (physicRatio > diskSpaceCleanForciblyRatio) { 
           cleanImmediately = true;
     } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                   DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
             }
      }
      if (physicRatio < 0 || physicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
      }
}

程式碼@1:獲取maxUsedSpaceRatio,表示commitlog、consumequeue檔案所在磁碟分割槽的最大使用量,如果超過該值,則需要立即清除過期檔案。
程式碼@2:通過File#getTotalSpace()獲取commitlog所在磁碟分割槽總的儲存容量,通過File#getFreeSpace()獲取commitlog目錄所在磁碟檔案剩餘容量並得出當前該分割槽的物理磁碟使用率physicRatio 。
程式碼@3:RocketMQ另外提供了兩個與磁碟空間使用率相關的系統級引數:
-Drocketmq.broker.diskSpaceWarningLevelRatio=0.90:如果磁碟分割槽使用率超過該闊值,將設定磁碟不可寫,此時會拒絕新訊息的寫入。
-Drocketmq.broker.diskSpaceCleanForciblyRatio=0.85:如果磁碟分割槽使用超過該闊值,建議立即執行過期檔案清除,但不會拒絕新訊息的寫入。
判斷磁碟是否可用,用當前已使用物理磁碟率maxUsedSpaceRatio、diskSpaceWarningLevelRatio、diskSpaceCleanForciblyRatio,如果當前磁碟使用率達到上述闊值,將返回true表示磁碟已滿,需要進行過期檔案刪除操作。

MappedFile#destroy
Step3:然後根據檔案的最後一次更新時間與當前時間做比較,判斷是否過期,如果已過期,呼叫MappedFile的destory。
MappedFile#shutdown

public void shutdown(final long intervalForcibly) {
        if (this.available) {
            this.available = false;
            this.firstShutdownTimestamp = System.currentTimeMillis();
            this.release();
        } else if (this.getRefCount() > 0) {
            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                this.refCount.set(-1000 - this.getRefCount());
                this.release();
            }
        }
    }

如果available為true,表示第一次執行shutdown方法,首先設定available為false,並記錄firstShutdownTimestamp時間戳,如果當前該檔案被其他執行緒引用,則本次不強制刪除,如果沒有其他執行緒在使用該檔案,則清除MappedFile相關資源,並最終執行File#delete()方法清除檔案。在拒絕被刪除保護期內(destroyMapedFileIntervalForcibly)每執行一次清理任務,將引用次數減去1000,引用數小於1後,該檔案最終將被刪除。
關於ConsumeQueue的過期檔案刪除機制與Commitlog檔案機制類似,本文就不重複講解。
本文重點是理解如下引數的含義:
fileReservedTime、deletePhysicFilesInterval、destroyMapedFileIntervalForcibly、-Drocketmq.broker.diskSpaceWarningLevelRatio
-Drocketmq.broker.diskSpaceCleanForciblyRatio與獲取磁碟分割槽總容量與剩餘容量的方法。