1. 程式人生 > >【RocketMQ原始碼學習】- 5. 訊息儲存機制

【RocketMQ原始碼學習】- 5. 訊息儲存機制

前言

面試官:你瞭解RocketMQ是如何儲存訊息的嗎?
我:額,,,你等下,我看下這篇文字, (逃

由於這部分內容優點多,所以請哥哥姐姐們自備茶水,歡迎留言!

 

RocketMQ儲存設計是高可用和高效能的保證, 利用磁碟儲存來滿足海量堆積能力。Kafka單機在topic數量在100+的時候,效能會下降很多,而RocketMQ能夠在多個topic存在時,依然保持高效能

下面主要從儲存結構、儲存流程、儲存優化的技術來形成文字

基於的版本是RocketMQ4.5.2

 

儲存架構圖

  1. 要傳送的訊息,會按順序寫入commitlog中,這裡所有topic和queue共享一個檔案
  2. 存入commitlog後,由於訊息會按照topic緯度來消費,會非同步構建consumeQueue(邏輯佇列)和index(索引檔案),consumeQueue儲存訊息的commitlogOffset/messageSize/tagHashCode, 方便定位commitlog中的訊息實體。每個 Topic下的每個Message Queue都有一個對應的ConsumeQueue檔案。索引檔案(Index)提供訊息檢索的能力,主要在問題排查和資料統計等場景應用
  3. 消費者會從consumeQueue取到msgOffset,方便快速取出訊息 

好處

  1. CommitLog 順序寫 ,可以大大提高寫人效率,提高堆積能力
  2. 雖然是隨機讀,但是利用作業系統的pagecache機制,可以批量地從磁碟讀取,作為cache存到記憶體中,加速後續的讀取速度
  3. 在實際情況中,大部分的 ConsumeQueue能夠被全部讀人記憶體,所以這個中間結構的操作速度很快, 可以認為是記憶體讀取的速度

訊息檔案儲存的結構設計

儲存的檔案主要分為:

  • commitlog: 儲存訊息實體
  • consumequeue: 按Topic和佇列儲存訊息的offset
  • index: index按key、tag、時間等儲存

commitlog(物理佇列)

檔案地址:${user.home} \store${commitlog}${fileName}

commitlog特點:

  • 存放該broke所有topic的訊息
  • 預設1G大小
  • 以偏移量為檔名,當一個檔案寫滿時則建立新檔案,這樣的設計主要是方便根據訊息的物理偏移量,快速定位到訊息所在的物理檔案
  • 一個訊息儲存單元是不定長的
  • 順序寫但是隨機讀

訊息單元的儲存結構

下面的表格說明了,每個訊息體不是定長的,會儲存訊息的哪些內容,包括物理偏移量、consumeQueue的偏移量、訊息體等資訊

順序欄位名說明
1 totalSize(4Byte) 訊息大小
2 magicCode(4) 設定為daa320a7 (這個不太明白)
3 bodyCRC(4) 當broker重啟recover時會校驗
4 queueId(4) 訊息對應的consumeQueueId
5 flag(4) rocketmq不做處理,只儲存後透傳
6 queueOffset(8) 訊息在consumeQueue中的偏移量
7 physicalOffset(8) 訊息在commitlog中的偏移量
8 sysFlg(4) 事務標示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE
9 bronTimestamp(8) 訊息產生端(producer)的時間戳
10 bronHost(8) 訊息產生端(producer)地址(address:port)
11 storeTimestamp(8) 訊息在broker儲存時間
12 storeHostAddress(8) 訊息儲存到broker的地址(address:port)
13 reconsumeTimes(4) 訊息重試次數
14 preparedTransactionOffset(8) 事務訊息的物理偏移量
15 bodyLength(4) 訊息長度,最長不超過4MB
16 body(body length Bytes) 訊息體內容
17 topicLength(1) 主題長度,最長不超過255Byte
18 topic(topic length Bytes) 主題內容
19 propertiesLength(2) 訊息屬性長度,最長不超過65535Bytes
20 properties(properties length Bytes) 訊息屬性內容

 

consumequeue檔案(邏輯佇列)

檔案地址:${user.home}\store\consumeQueue${topic}${queueId}${fileName}

consumequeue檔案特點:

  • 按topic和queueId緯度分別儲存訊息commitLogOffset、size、tagHashCode
  • 以偏移量為檔名
  • 一個儲存單元是20個位元組的定長的
  • 順序讀順序寫
  • 每個ConsumeQueue檔案大小約5.72M

每個Topic下的每個MessageQueue都有一個對應的ConsumeQueue檔案
該結構對應於消費者邏輯佇列,為什麼要將一個topic抽象出很多的queue呢?這樣的話,對叢集模式更有好處,可以使多個消費者共同消費,而不用上鎖;

訊息單元的儲存結構

順序欄位名說明
1 offset(8) commitlog的偏移量
2 size(4) commitlog訊息大小
3 tagHashCode tag的雜湊值

 

index索引檔案

檔案地址:${user.home}\store\index${fileName}

index檔案特點:

  • 以時間作為檔名
  • 一個儲存單元是20個位元組定長的

索引檔案(Index)提供訊息檢索的能力,主要在問題排查和資料統計等場景應用

儲存單元的結構

順序欄位名說明
1 keyHash(4) key的結構是
2 phyOffset(8) commitLog真實的物理位移
3 timeOffset(4) 時間偏移量
4 slotValue(4) 下一個記錄的slot值

 

訊息儲存流程

RocketMQ檔案儲存模型層次結構

層次從上到下依次為:

  1. 業務層
    • QueueMessageProcessor類
    • PullMessageProcessor類
    • SendMessageProcessor類
    • DefaultMessageStore類
  2. 儲存邏輯層
    • IndexService類
    • ConsumeQueue類
    • CommitLog類
    • IndexFile類
    • MappedFileQueue類
  3. 磁碟互動IO層
    • MappedFile類
    • MappedByteBuffer類
業務層 QueueMessageProcessor PullMessageProcessor
SendMessageProcessor
DefaultMessageStore
儲存邏輯層 IndexService ConsumeQueue CommitLog
IndexFile MappedFileQueue
磁碟互動IO層 MappedFile
MappedByteBuffer
Disk

 

寫commoitlog流程

1. DefaultMessageStore,入口方法是putMessage方法

RocketMQ 的儲存核心類為 DefaultMessageStore,入口方法是putMessage方法

 1 // DefaultMessageStore#putMessage
 2 public PutMessageResult putMessage(MessageExtBrokerInner msg) {
 3     // 判斷該服務是否shutdown,不可用直接返回【程式碼省略】
 4     // 判斷broke的角色,如果是從節點直接返回【程式碼省略】
 5     // 判斷runningFlags是否是可寫狀態,不可寫直接返回,可寫把printTimes設為0【程式碼省略】
 6     // 判斷topic名字是否大於byte位元組127, 大於則直接返回【程式碼省略】
 7     // 判斷msg中properties屬性長度是否大於short最大長度32767,大於則直接返回【程式碼省略】
 8 
 9     if (this.isOSPageCacheBusy()) { // 判斷作業系統頁寫入是否繁忙
10         return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
11     }
12 
13     long beginTime = this.getSystemClock().now();
14     PutMessageResult result = this.commitLog.putMessage(msg);   // $2 檢視下方程式碼,寫msg核心
15 
16     long elapsedTime = this.getSystemClock().now() - beginTime;
17     if (elapsedTime > 500) {
18         log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
19     }
20     // 記錄寫commitlog時間,大於最大時間則設定為這個最新的時間
21     this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
22 
23     if (null == result || !result.isOk()) {
24         // 記錄寫commitlog 失敗次數
25         this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
26     }
27 
28     return result;
29 }

$2 CommitLog#putMessage 將日誌寫入CommitLog 檔案

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());  // $1
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // $2
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();   // $3

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config // $4
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        msg.setStoreTimestamp(beginLockTimestamp);

        if (null == mappedFile || mappedFile.isFull()) {    // $5 
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
        }

        result = mappedFile.appendMessage(msg, this.appendMessageCallback); // $6
        switch (result.getStatus()) {   // $7
            case PUT_OK:
                break;
            case END_OF_FILE:   
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {   
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                }
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
            case UNKNOWN_ERROR:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            default:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
        }

        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        putMessageLock.unlock();
    }

    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics 
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

    handleDiskFlush(result, putMessageResult, msg); // $8
    handleHA(result, putMessageResult, msg);        // $9

    return putMessageResult;
}
  1. $1 獲取訊息的事務型別
  2. $2 對於事務訊息中UNKNOW、COMMIT訊息,處理topic和queueId, 同時備份real_topic,real_queueId
  3. $3 獲取最新的mappedFile檔案,有可能為空
  4. $4 給寫mappedFile加鎖(預設自旋鎖)
  5. $5 mappedFile為空時建立mappedFile檔案, 建立的mappedFile檔案offset為0
  6. $6 在mappedFile中append訊息,下面具體說明
  7. $7 根據mappedFile寫訊息的結果
    • ok, 直接break
    • 檔案剩下的空間不夠寫了,重新建立一個mappedFile檔案, 重新寫訊息
    • msg大小,properties大小,未知錯誤,返回錯誤型別
  8. $8 執行刷盤
  9. $9 執行主從同步

3. $6 在mappedFile中append訊息

mappedFile.appendMessage方法會呼叫this.appendMessagesInner方法

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    int currentPos = this.wrotePosition.get();  // $1

    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); // $2
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {  // $3
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); // $4
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());   // $5
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
  1. $1 獲取當前寫入位置
  2. $2 建立寫快取,放入檔案的寫入位置
  3. $3 判斷是單條訊息還是批量訊息
  4. $4 同步寫訊息, fileSize-currentPos即為該檔案還剩下的空白大小
  5. $5 寫完訊息,累加檔案當前位置

4. $4 同步寫訊息

程式碼在CommitLog內部類 DefaultAppendMessageCallback中 

// CommitLog$DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    long wroteOffset = fileFromOffset + byteBuffer.position();  // $1
    this.resetByteBuffer(hostHolder, 8);    // $2
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    Long queueOffset = CommitLog.this.topicQueueTable.get(key); // $3
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:  // $4
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

    // Serialize message        // $5
    final byte[] propertiesData =
        msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;
    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
    final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
            + ", maxMessageSize: " + this.maxMessageSize);
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }

    // Determines whether there is sufficient free space
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {  // $6
        this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
        
        this.msgStoreItemMemory.putInt(maxBlank);   // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 2 MAGICCODE
        // 3 The remaining space may be any value
        // Here the length of the specially set maxBlank
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    // $7 【程式碼省略】
    
    if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData);
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);     // $8

    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,    // $9
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

    switch (tranType) {
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // The next update ConsumeQueue information
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            break;
        default:
            break;
    }
    return result;
}
  1. $1 計算訊息的物理偏移量=檔案初始偏移量+byteBuffer開始的偏移量,檔案初始偏移量跟commitlog檔名相同
  2. $2 在讀buffer之前,呼叫flip方法翻轉buffer(設定position為0,limit設定為8)
  3. $3 在topicQueueTable中快取msg對應的offset
  4. $4 針對事務訊息的prepare、rollback訊息,由於這個訊息不需要對消費這可見,所以queueOffset=0,不記到consumerQueue
  5. $5 序列化properties,topic,計算訊息最大值
  6. $6 如果訊息長度+8大於MapperFile剩餘檔案空間,則返回END_OF_FILE, 拋給上層,由CommitLog#putMessage這層重新建立檔案,重新寫訊息
  7. $7 根據commitlog的資料結構,構建commitlog資料,如TOTALSIZE,MAGICCODE 。。等等
  8. $8 把構建的this.msgStoreItemMemory寫到byteBuffer中(記憶體中)
  9. $9 生成返回值
  10. $10 針對提交事務訊息,重新放入topicQueueTable ??? 

非同步構建ConsumeQueue和Index檔案流程

  1. ConsumeQueue和IndexFile什麼時候建立的呢?
    – 在Broker啟動的時候,會啟動一個ReputMessageService執行緒服務, 會去設定consumeQueueTable記憶體中最大的偏移量
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
    for (ConsumeQueue logic : maps.values()) {
        if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
            maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
        }
    }
}
if (maxPhysicalPosInLogicQueue < 0) {
    maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
    maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
    log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
this.reputMessageService.start();
  1. ReputMessageService執行緒每隔1ms執行doReput操作->根據CommitLog最新追加到的訊息不斷生成:
  • 訊息的offset到CommitQueue
  • 訊息索引到IndexFile
  1. 下面檢視下doReput方法具體執行
private void doReput() {
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { // $1
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
            this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { // $2
        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {   
            break;
        }
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);  // $3
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset(); // $4

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // $5 構建dispatchRequest
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            DefaultMessageStore.this.doDispatch(dispatchRequest);   // $6 

                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() // 如果該broker是主broker,可以推送訊息到達conusmerQueue的訊息,這裡使用者也客戶自定定推送的監聽
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }

                            this.reputFromOffset += size;   // $7
                            readSize += size;
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {

                        if (size > 0) { // &8
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                    this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}
  • doReput流程:
    1. $1 如果reputFromOffset小於檔案起始偏移量,則把reputFromOffset設定為檔案起始偏移量,出現的可能原因:磁碟損壞,認為人為了檔案等
    2. $2 因為reputFromOffset是consumeQueue中的偏移量,所以只要reputFromOffset小於commitlog最大偏移量,就會不斷的迴圈
    3. $3 根據offset獲取byteBuffer
    4. $4 更新reputFromOffset成byteBuffer中的offset
    5. $5 構建dispatchRequest
    6. $6 分別呼叫CommitLogDispatcherBuildConsumeQueue(構建訊息消費佇列)和CommitLogDispatcherBuildIndex(構建索引檔案)
    7. $7 讀完這條訊息,更新reputFromOffset+=size,更新readSize+=size
    8. $8 不成功,如果這個訊息的size不為0,嘗試下一條
  1. 根據訊息更新ConsumeQueue
    在doReput方法中$6中會更新consumeQueue, 訊息消費佇列轉發的任務實現類為:CommitLogDispatcherBuildConsumeQueue,內部實際呼叫的是putMessagePositionInfo方法

  Step1: 根據topicId和queueId獲取ConsumeQueue
  Step2: 將訊息偏移量、訊息size、tagHashCode(檢視ConsumeQueue的資料結構)),把訊息追加到ConsumeQueue的記憶體對映檔案(mappedFile)中(不刷盤),consumeQueue預設非同步刷盤

1 return mappedFile.appendMessage(this.byteBufferIndex.array());
  1. 根據訊息更新Index索引檔案
    Hash索引檔案轉發任務實現類:CommitLogDispatcherBuildIndex

    如果messageIndexEnable設定為true, 則轉發此任務,否則不轉發
    step1: 獲取indexFile, 如果indexFileList的記憶體中沒有indexFile,則根據路徑重新構建indexFile
    step2: 如果訊息的唯一鍵不存在,則條件到放到indexFile中

說說儲存的類與檔案

 DefaultMessageStore類核心屬性

上面說到DefaultMessageStore是儲存的業務層,putMessage是入口方法

  • messageStoreConfig
    • 儲存相關的配置,例如儲存路徑、commitLog檔案大小,刷盤頻次等等。
  • CommitLog commitLog
    • comitLog 的核心處理類,訊息儲存在 commitlog 檔案中。
  • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable
    • topic 的佇列資訊。
  • FlushConsumeQueueService flushConsumeQueueService
    • ConsumeQueue 刷盤服務執行緒。
  • CleanCommitLogService cleanCommitLogService
    commitLog 過期檔案刪除執行緒。
  • CleanConsumeQueueService cleanConsumeQueueService
    • consumeQueue 過期檔案刪除執行緒。、
  • IndexService indexService
    • 索引服務。
  • AllocateMappedFileService allocateMappedFileService
    • MappedFile 分配執行緒,RocketMQ 使用記憶體對映處理 commitlog、consumeQueue檔案。
  • ReputMessageService reputMessageService
    • reput 轉發執行緒(負責 Commitlog 轉發到 Consumequeue、Index檔案)。
  • HAService haService
    • 主從同步實現服務。
  • ScheduleMessageService scheduleMessageService
    • 定時任務排程器,執行定時任務。
  • StoreStatsService storeStatsService
    • 儲存統計服務。
  • TransientStorePool transientStorePool
    • ByteBuffer 池
  • RunningFlags runningFlags
    • 儲存服務狀態。
  • BrokerStatsManager brokerStatsManager
    • Broker 統計服務。
  • MessageArrivingListener messageArrivingListener
    • 訊息達到監聽器。
  • StoreCheckpoint storeCheckpoint
    • 刷盤檢測點。
  • LinkedList dispatcherList
    • 轉發 comitlog 日誌,主要是從 commitlog 轉發到 consumeQueue、index 檔案。

從上面的屬性可以觀察到有幾類屬性:

  • 服務類:如刷盤服務執行緒、刪除檔案執行緒、索引服務、mappedFile分配執行緒、reput轉發執行緒、主從同步執行緒、定時任務服務、broker統計服務
  • 配置類:儲存設定類
  • 儲存資訊類:commitLog、consumeQueueTable topic佇列資訊、transientStorePool ByteBuffer池、刷盤檢測點、dispatcherList
  • 監聽器:訊息達到監聽器

刷盤

這裡會另起一篇文字來說明

 

執行主從同步

這裡會另起一篇文字來說明

PageCache(頁快取)與Mmap記憶體對映 

pageCache定義

Page cache 也叫頁緩衝或檔案緩衝,是由好幾個磁碟塊構成,大小通常為4k,在64位系統上為8k,構成的幾個磁碟塊在物理磁碟上不一定連續,檔案的組織單位為一頁, 也就是一個page cache大小,檔案讀取是由外存上不連續的幾個磁碟塊,到buffer cache,然後組成page cache,然後供給應用程式。

pageCache載入

作業系統操作I/O時,會先在pageCache中查詢,如果未命中,則啟動磁碟I/O,並把磁碟檔案中的資料載入到pageCache的一個空閒快中,然後在copy到使用者緩衝區 

pageCache預讀

對於每個檔案的第一個讀請求操作,系統在讀入所請求頁面的同時會順序讀入後面少數幾個頁面 

pageCache與RocketMQ的關聯

MQ讀取訊息依賴系統PageCache,PageCache命中率越高,讀效能越高

ConsumeQueue邏輯消費佇列是順序讀取,在pageCache機制的預讀取作用下,ConsumeQueue的讀效能會比較高近乎記憶體,即使在有訊息堆積情況下也不會影響效能。

Mmap記憶體對映技術—MappedByteBuffer

另外,RocketMQ主要通過MappedByteBuffer對檔案進行讀寫操作。其中,利用了NIO中的FileChannel模型直接將磁碟上的物理檔案直接對映到使用者態的記憶體地址中(這種Mmap的方式減少了傳統IO將磁碟檔案資料在作業系統核心地址空間的緩衝區和使用者應用程式地址空間的緩衝區之間來回進行拷貝的效能開銷),將對檔案的操作轉化為直接對記憶體地址進行操作,從而極大地提高了檔案的讀寫效率 

使用mmap記憶體對映的限制

  • 每次只能對映1.5左右的檔案至使用者態的虛擬記憶體,這也是為何RocketMQ預設設定單個CommitLog日誌資料檔案為1G的原因
  • MMAP 使用的是虛擬記憶體,和 PageCache 一樣是由作業系統來控制刷盤的,雖然可以通過 force() 來手動控制,但這個時間把握不好,在小記憶體場景下會很令人頭疼。
  • 會存在記憶體佔用率較高和檔案關閉不確定性的問題

結語

參考:

  • https://www.jianshu.com/p/b73fdd893f98
  • https://blog.csdn.net/prestigeding/category_9273001.html

歡迎關注我的公眾號