1. 程式人生 > >RocketMQ 拉取消息-文件獲取

RocketMQ 拉取消息-文件獲取

一次 lsi select ice logs sets 數據 public file

看完了上一篇的《RocketMQ 拉取消息-通信模塊》,請求進入PullMessageProcessor中,接著

PullMessageProcessor.processRequest(final ChannelHandlerContext ctx, RemotingCommand request)方法中調用了:

 final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);

來從硬盤中獲取消息體。

接著來看看DefaultMessageStore是如何從消息隊列中獲取消息的:

    /**
     * 獲取消息結果
     * 所有的參數都是從requestheader中獲取的。也就是說從consumer client端傳遞過來的。
     * @param group
     * @param topic
     * @param queueId
     * @param offset
     * @param maxMsgNums
     * @param subscriptionData
     * @return
     */
    public
GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final SubscriptionData subscriptionData) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden");
return null; } if (!this.runningFlags.isReadable()) { log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); return null; } long beginTime = this.getSystemClock().now(); // 枚舉變量,取消息結果 GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; // 當被過濾後,返回下一次開始的Offset long nextBeginOffset = offset; // 邏輯隊列中的最小Offset long minOffset = 0; // 邏輯隊列中的最大Offset long maxOffset = 0; GetMessageResult getResult = new GetMessageResult(); final long maxOffsetPy = this.commitLog.getMaxOffset(); //通過topic和queueid查找邏輯隊列對象,相當於字典的目錄,用來指定消息在物理文件commitlog上的位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQuque(); maxOffset = consumeQueue.getMaxOffsetInQuque(); if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; final int MaxFilterMessageCount = 16000; final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; // 說明物理文件正在被刪除 if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } // 判斷是否拉磁盤數據 boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); // 此批消息達到上限了 if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } // 消息過濾 if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) { SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (selectResult != null) { this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } else { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); } } else { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } if (log.isDebugEnabled()) { log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode); } } } if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } nextBeginOffset = offset + (i / ConsumeQueue.CQStoreUnitSize); long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TotalPhysicalMemorySize * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { // 必須釋放資源 bufferConsumeQueue.release(); } } else { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed."); } } } // 請求的隊列Id沒有 else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } if (GetMessageStatus.FOUND == status) { this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); } else { this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); } long eclipseTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime); getResult.setStatus(status); getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; }

先來看一下獲取消息隊列的方法:

    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(//
                    topic, //
                    queueId, //
                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
                    this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
                    this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

RocketMQ 拉取消息-文件獲取