rocketmq之原始碼分析broker之核心MessageStore訊息接受(十八)
這章我們從broker接受到訊息後的處理,從原始碼加註解的角度解析整體處理及技術,整體的處理步驟如下:
sendMessage
1,訊息協議轉換,主要是相容協議版本
2,驗證訊息的正確性,主要是當前broker是否具備寫許可權,topic,queue是否正確
3,獲得topic的配置資訊
4,構造訊息的內部傳輸物件MessageExtBrokerInner
5,判斷是否事物訊息,如果是執行事物的支援判斷,然後進行前置事物處理,如果不是事物訊息執行常規的操作
呼叫MessageStore的putMessage方法
1,驗證服務的可用,可寫等安全機制
2,判斷當前服務的osPageCache是否可用,涉及到高效能的io操作
3,訊息內容儲存,核心在CommitLog.putMessage
1,擴充套件內部物件屬性,增加時間及驗證屬性
2,獲得儲存的統計服務,備用,用於請求的前置資料
3,事物訊息的處理
4,通過mappedFileQueue來獲得最近的可寫檔案MappedFile操作,內部是升序的集合,名稱是根據偏移位置的範圍來定義的
5,執行put的鎖獲取
6,配置請求訊息的儲存時間戳設定
7,針對mappedFile的再次驗證,是否為空或者寫滿,然後獲得一個新的mappedfile
8,操作mappedFile進行追加訊息內容,同時有當前的回撥函式,主要操作的回撥函式中進行
9,操作狀態的判斷,這裡有一個性能點,如果返回時檔案末尾,則執行建立一個新的檔案,供下次使用
10,釋放put的鎖
11,構建響應資料結構體
12,將當前操作結果推入儲存統計服務
13,執行資料刷盤操作,有同步和非同步區分,非同步四基於生成消費者設計實現的
14,高可用的實現,處理master和slave,master將得到的資料儲存,待slave來拉取
4,儲存統計服務進行統計
6,構建響應體
1,判斷響應狀態機制,根據不同的狀態設定不同的狀態碼和提示語
2,如果成功,將操作推入broker的狀態統計服務
3,擴充套件響應體內容
4,回寫資料給客戶端
原始碼分析:
1,傳送訊息入口
//傳送訊息的接受操作 private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { //構造響應體,用於返回資料 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); //構造響應體的頭內容,用於返回資料頭 final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); //唯一標識,讓客戶端做對應關係處理,及回撥對應 response.setOpaque(request.getOpaque()); //響應的擴充套件內容 response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); //時間的設定 final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1); //驗證訊息的正確性 super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } //請求訊息體 final byte[] body = request.getBody(); //根據請求體獲得topic的配置 int queueIdInt = requestHeader.getQueueId(); //獲得訊息的topic的配置資訊 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); //判斷 if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } //訊息的內部操作 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); //處理retry相關的topic資訊 if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); //擴充套件屬性設定 MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null; //獲得訊息的擴充套件 Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); //判斷是否是事物訊息 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { //事物操作判斷,當前是否支援事物操作 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } //執行事物訊息的前置操作 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { //執行常規訊息的操作 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } //構建相應體 return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }
2,儲存訊息putMessage
//接受producer端的傳送訊息 public PutMessageResult putMessage(MessageExtBrokerInner msg) { //服務是否已關閉 if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } //如果是slave則返回錯誤 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is slave mode, so putMessage is forbidden "); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } //操作是否可寫 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); } //topic的限制 if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } //訊息內容的限制 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } //系統的osPage是否可用 if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } long beginTime = this.getSystemClock().now(); //設定訊息的操作 PutMessageResult result = this.commitLog.putMessage(msg); long eclipseTime = this.getSystemClock().now() - beginTime; if (eclipseTime > 500) { log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; }
3,儲存內容操作
//儲存訊息的核心操作 public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); //事物訊息的特殊處理 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long eclipseTimeInLock = 0; MappedFile unlockMappedFile = null; //獲取需要操作的檔案緩衝區 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //獲得鎖,之前用的重量鎖ReentrantLoc,現在可使用CAS來高效 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp); //檔案是否已經存滿,新建新的檔案緩衝區 if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } //追加訊息 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); //處理刷盤的操作 handleDiskFlush(result, putMessageResult, msg); // handleHA(result, putMessageResult, msg); return putMessageResult; }
4,核心的訊息內容追加
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
首先看操作方式MappedFile的原始碼
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { return appendMessagesInner(msg, cb); } public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) { return appendMessagesInner(messageExtBatch, cb); } //將訊息放置到緩衝區中,接受的回撥方法 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; //獲取當前的檔案寫的偏移位置 int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); //當前的位置 byteBuffer.position(currentPos); AppendMessageResult result = null; //區分單次和批量提交的操作 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
然後看appendMessageCallback的設計,主要是序列化的操作,對訊息內容進行序列化然後儲存
//追加檔案內容的操作 public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); this.resetByteBuffer(hostHolder, 8); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); //事物訊息的特殊 switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } /** * Serialize message */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } //這是是將檔案內容序列化的操作,便於後期從緩衝區去取內容後反序列化操作 // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }
這裡說一個核心的操作,關於MappedFile中記憶體的操作
1,MappedFile的獲取 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile()
public MappedFile getLastMappedFile() { MappedFile mappedFileLast = null; while (!this.mappedFiles.isEmpty()) { try { mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1); break; } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error("getLastMappedFile has exception.", e); break; } } return mappedFileLast; }
2,mappedFiles是怎麼獲取和構造的
public boolean load() { File dir = new File(this.storePath); File[] files = dir.listFiles(); if (files != null) { // ascending order Arrays.sort(files); for (File file : files) { if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() + " length not matched message store config value, please check it manually"); return false; } try { //效能的核心檔案操作 MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.error("load file " + file + " error", e); return false; } } } return true; }
3,如果存滿了怎麼更好的建立 mappedFile = this.mappedFileQueue.getLastMappedFile(0);
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) { long createOffset = -1; //獲得最近的檔案 MappedFile mappedFileLast = getLastMappedFile(); //如果為空,則獲得建立的偏移位置 if (mappedFileLast == null) { createOffset = startOffset - (startOffset % this.mappedFileSize); } //如果檔案已滿則獲得建立位置 if (mappedFileLast != null && mappedFileLast.isFull()) { createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize; } //建立檔案 if (createOffset != -1 && needCreate) { String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mappedFileSize); MappedFile mappedFile = null; //請求定位建立服務 if (this.allocateMappedFileService != null) { mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); } else { try { mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); } catch (IOException e) { log.error("create mappedFile exception", e); } } //將檔案增加到管理中 if (mappedFile != null) { if (this.mappedFiles.isEmpty()) { mappedFile.setFirstCreateInQueue(true); } this.mappedFiles.add(mappedFile); } return mappedFile; } return mappedFileLast; } public MappedFile getLastMappedFile(final long startOffset) { return getLastMappedFile(startOffset, true); }
4,mappedFile的構造初始化
public class MappedFile extends ReferenceResource { public static final int OS_PAGE_SIZE = 1024 * 4; protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); protected final AtomicInteger wrotePosition = new AtomicInteger(0); //ADD BY ChenYang protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); protected int fileSize; protected FileChannel fileChannel; /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ protected ByteBuffer writeBuffer = null; protected TransientStorePool transientStorePool = null; private String fileName; private long fileFromOffset; private File file; private MappedByteBuffer mappedByteBuffer; private volatile long storeTimestamp = 0; private boolean firstCreateInQueue = false; public MappedFile() { } public MappedFile(final String fileName, final int fileSize) throws IOException { init(fileName, fileSize); } private void init(final String fileName, final int fileSize) throws IOException { this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; ensureDirOK(this.file.getParent()); try { //jdk中nio的操作,可讀寫調轉的檔案通道 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); //根據檔案通道獲得檔案對映記憶體 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("create file channel " + this.fileName + " Failed. ", e); throw e; } catch (IOException e) { log.error("map file " + this.fileName + " Failed. ", e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } }