RocketMQ原始碼分析 broker啟動,commitlog、consumequeue、indexfile、MappedFileQueue、MappedFile之間的關係以及位置說明
阿新 • • 發佈:2020-12-20
[toc]
## 1.MappedFile類屬性說明
dubbo的核心是spi,看懂了spi那麼duboo基本上也懂了,對於rmq來說,它的核心是broker,而broker的核心是commitlog、consumequeue、indexfile,而這些檔案對應的最終都是MappedFile,那麼搞明白了這個類,那麼對於broker的儲存這塊也就很容易明白了
### 1.1.MappedFile類屬性如下
OS_PAGE_SIZE是4k,表示作業系統頁
TOTAL_MAPPED_VIRTUAL_MEMORY,TOTAL_MAPPED_FILES 都是static變數,分別儲存的是總共對映的資料容量,對映的總檔案數
wrotePosition 表示當前寫的位置,具體啥是當前寫的位置,後續分析
committedPosition 表示當前提交的位置,具體咋理解,往後看
flushedPosition 表示當前重新整理的位置,具體咋理解,往後看,以上這三個變數很重要
fileSize 表示該MappedFile對應的磁碟檔案的size,對於commitlog來說是1G,對於consumequeue來說是6000000 位元組,對於indexfile來說是42040位元組
fileChannel表示該檔案的通道
writeBuffer是堆外記憶體,在開啟了transientStorePoolEnable=true的情況下非null,生產通常都會開啟transientStorePoolEnable以供消費時候讀寫分離
transientStorePool是堆外記憶體池,在開啟了transientStorePool的情況有有值
fileName 檔名,比如對於commitlog來說第一個檔案是0*1024^3,第二個檔案是1*1024^3
fileFromOffset 即檔案的起始位置(相對於整個commitlog or consumequeue來說),比如commitlog consumequeue檔案來說,該值就是fileName
file 就是檔案
mappedByteBuffer 即pagecache,通過fileChannel.map生成
storeTimestamp 最後一次訊息的儲存時間
firstCreateInQueue 對於佇列有用
### 1.2.MappedFile構造器說明
有兩個構造器
MappedFile(final String fileName, final int fileSize)
MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool)
在不開啟transientStorePoolEnable=true的情況下,都是使用第一個構造器
在開啟的情況下,broker啟動進行load操作載入commitlog consumequeue indexfile檔案都是使用第一個構造器,在broker執行過程中,consumequeue indexfiel建立也是使用第一個構造器,在broker執行過程中,建立commitlog檔案都是使用的第二個構造器,ctrl+alt+H檢視呼叫如下圖
![image-20201220122243482](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220122243.png)
在AllocateMappedFileService.mmapOperation()內通過jdk的spi機制載入META-INF目錄下org.apache.rocketmq.store.MappedFile檔案,預設是沒有該檔案的,程式碼如下
```java
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();//丟擲異常,預設META-INF下是沒有org.apache.rocketmq.store.MappedFile檔案的
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {//捕獲異常
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());//走這裡
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
```
生產通常是開啟transientStorePoolEnable=true的,那麼以第二個構造器為例說明,第二個構造器明白了,第一個構造器自然就明白了
```java
public MappedFile(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();//從堆外記憶體緩衝池構建堆外記憶體
this.transientStorePool = transientStorePool;
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;//檔名
this.fileSize = fileSize;//檔案大小
this.file = new File(fileName);//根據檔案構造File
this.fileFromOffset = Long.parseLong(this.file.getName());//檔名作為起始offset
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// 將此通道的檔案區域直接對映到記憶體中,該屬性就是pagecache
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
```
該構造器就是根據檔名路徑構造MappedFile檔案,構造堆外記憶體、pagecache屬性,此時建立後wrotePosition committedPosition flushedPosition三個位置都是0。這兩個構造器的唯一區別就是第一個構造器是不賦值其堆外記憶體屬性writeBuffer。
這裡記錄下堆外記憶體池TransientStorePool的來源
BrokerStartup.main(String[]) //broker啟動入口
BrokerStartup.createBrokerController(String[])//建立BrokerController
BrokerController.initialize()//BrokerController初始化
new DefaultMessageStore(MessageStoreConfig, BrokerStatsManager, MessageArrivingListener, BrokerConfig)//建立DefaultMessageStore,程式碼如下
```java
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
//省略其他程式碼
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {//預設false,如果broker開啟了transientStorePoolEnable=true,則執行。transient含義是短暫的
this.transientStorePool.init();
}
//省略其他程式碼
}
//TransientStorePool
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);//建立堆外記憶體DirectByteBuffer
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));//在實體記憶體鎖定1G,使用的jna,如果以後自己專案需要進行鎖定記憶體,那麼則可以參考這裡
availableBuffers.offer(byteBuffer);//把申請的堆外記憶體快取起來
}
}
```
## 2.MappedFileQueue類說明
### 2.1.屬性說明
storePath 是目錄路徑,對commitlog來說是${rocketmq-store}/commitlog 對consumequeue來說是${rocketmq-store}/consumequeue
mappedFileSize 單個MappedFile檔案對應的磁碟檔案的size,對commitlog來說是1024^3即1G 對consumequeue來說是30w*20即600w
mappedFiles 即MappedFile集合,是CopyOnWriteArrayList型別,併發集合
allocateMappedFileService 分配MappedFile服務AllocateMappedFileService,是個runnable物件,啟動該服務用於建立commitlog MappedFile物件,對於commitlog來說該屬性是AllocateMappedFileService,對於consumequeue來說該屬性是null
flushedWhere committedWhere 分別表示重新整理位置,提交位置,跟MappedFile的三個位置關聯,具體後續說明
storeTimestamp 最後儲存的時間戳
實際MappedFileQueue就是個MappedFile集合,個人認為叫MappedFileList更貼切,叫MappedFileQueue剛開始看的時候總是容易混淆。
### 2.2.MappedFileQueue構造器說明
MappedFileQueue構造器呼叫:
#### 2.2.1.對於commitlog
在broker啟動時候初始化BrokerController的時候建立DefaultMessageStore的時候建立CommitLog物件的時候呼叫,commitlog物件對應的檔案結構如圖
![image-20201220122645947](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220122646.png)
#### 2.2.2.對於consumequeue
這個呼叫就比較多
2.2.2.1.在broker啟動進行load操作的時候載入${rocketmq-store}/consumequeue/$topic/$queueId/$fileName的時候針對每一個topic下的queueId下的檔案都建立一個ConsumeQueue
一個ConsumeQueue等同一個MappedFileQueue等同多個MappedFile,為什麼這麼做呢?因為一個訊息佇列會有多個檔案,如下圖
![image-20201220122752388](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220122752.png)
2.2.2.2.在broker執行中DefaultMessageStore.findConsumeQueue(String, int)根據topic queueId查詢ConsumeQueue,如果查詢不到則建立ConsumeQueue物件,建立方式2.2.2.1
## 3.commitlog檔案與MappedFileQueue MappedFile關係
在broker啟動的時候建立commitlog物件並載入load磁碟的commitlog檔案,並從正常or異常關閉情況恢復,在執行過程中儲存訊息
### 3.1.broker啟動建立commitlog物件,即建立MappedFileQueue物件
![image-20201220122913057](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220122913.png)
### 3.2.broker啟動載入load consumequeue檔案
![image-20201220122958986](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220122959.png)
對於建立consumequeue物件來說,只是建立MappedFileQueue物件,並不建立具體的檔案物件MappedFile
### 3.3.broker啟動載入load commitlog檔案
![image-20201220123032147](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220123032.png)
org.apache.rocketmq.store.MappedFileQueue.load()方法是載入${rocketmq-store}/commitlog or ${rocketmq-store}/consumequeue目錄下的檔案,每個檔案對應建立一個MappedFile,新建的MappedFile物件wrotePosition committedPosition flushedPosition屬性都設定為檔名(起始位置),並把新建的MappedFile物件新增到快取MappedFileQueue.mappedFiles
至此,commitlog物件對應的MappedFileQueue的flushedWhere committedWhere都是0,每個MappedFile物件的wrotePosition committedPosition flushedPosition屬性都是檔名
程式碼如下
```java
//org.apache.rocketmq.store.DefaultMessageStore.load()
public boolean load() {
boolean result = true;
try {
boolean lastExitOK = !this.isTempFileExist();//存在abort檔案,說明broker上次是異常關閉,因為broker啟動後會建立abort檔案,正常關閉會刪除該檔案,啟動時候存在該檔案,說明上次是異常關閉
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");//abnormally不正常的
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();//載入${rocketmq_home}\store\config/delayOffset.json,該檔案儲存的是每個延時佇列的消費offset,16個延時級別,16個佇列
}
// load Commit Log
result = result && this.commitLog.load();//載入${rocketmq_home}\store\commitlog下的commitlog資料檔案
// load Consume Queue
result = result && this.loadConsumeQueue();//載入${rocketmq_home}\store\consumequeue下的消費佇列
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//載入checkpoint檔案,該檔案是用於異常關閉恢復,儲存的是刷盤位置
this.indexService.load(lastExitOK);//載入載入${rocketmq_home}\store\index目錄下的檔案
this.recover(lastExitOK);//使用commitLog恢復上次異常/正常關閉的broker
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
```
#### 3.3.1.commitlog載入程式碼如下
```java
//org.apache.rocketmq.store.CommitLog.load()
public boolean load() {
boolean result = this.mappedFileQueue.load();//載入I:\rocketmq\store\commitlog
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
//org.apache.rocketmq.store.MappedFileQueue.load()
public boolean load() {
File dir = new File(this.storePath);//${rocketmq_home}\store\commitlog
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);//為每個檔案建立對應的堆外記憶體對映
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
```
最終載入的commitlog檔案儲存到了CommitLog.mappedFileQueue.mappedFiles集合中。而commitlog物件又被包含在DefaultMessageStore,DefaultMessageStore又被包含在BrokerController物件內,最終在broker啟動載入commitlog檔案就被載入到了broker上。
#### 3.3.2.consumequeue載入如下程式碼
```java
//org.apache.rocketmq.store.DefaultMessageStore.loadConsumeQueue()
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));//目錄${rocketmq}\store\consumequeue
File[] fileTopicList = dirLogic.listFiles();//列舉出${rocketmq}\store\consumequeue目錄下的所有檔案
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {//遍歷topic目錄檔案
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {//遍歷佇列目錄下的檔案
int queueId;
try {
queueId = Integer.parseInt(fileQueueId.getName());
} catch (NumberFormatException e) {
continue;
}
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);//構建ConsumeQueue,該物件對應MappedFileQueue,對應多個MappedFile
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {//載入consumequeue檔案
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}
//org.apache.rocketmq.store.ConsumeQueue.load()
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
if (isExtReadEnable()) {//false
result &= this.consumeQueueExt.load();
}
return result;
}
```
最終每個consumequeue被載入到了org.apache.rocketmq.store.DefaultMessageStore.consumeQueueTable集合中儲存,key是topic,value是queueID和ConsumeQueue的對映集合
#### 3.3.3.indexfile載入如下
```java
//org.apache.rocketmq.store.index.IndexService.load(boolean)
public boolean load(final boolean lastExitOK) {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {//遍歷${rocketmq_home}\store\index目錄下的indexfile檔案
try {
IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);//吧每個indexfile檔案包裝為IndexFile物件
f.load();//載入indexfile檔案,即把前40位元組儲存到IndexHeader
if (!lastExitOK) {//broker上次是異常關閉
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
.getIndexMsgTimestamp()) {//如果indexfile的結束時間戳(儲存到indexfile的最後一條訊息的時間戳)大於StoreCheckpoint索引刷盤時間戳,則銷燬該indexfile物件,即釋放對應的MappedFile物件(該物件包裝了堆外記憶體,釋放堆外記憶體)。 為什麼要銷燬呢?因為StoreCheckpoint是刷盤儲存點,用於儲存commitlog consumequeue indexfile刷盤的位置,便於異常關閉恢復。如果indexfile的結束時間戳大於StoreCheckpoint索引刷盤時間戳,則說明該IndexFile是由於broker異常關閉並沒有被刷盤
f.destroy(0);//釋放該MappedFile物件,釋放堆外記憶體
continue;
}
}
log.info("load index file OK, " + f.getFileName());
this.indexFileList.add(f);
} catch (IOException e) {
log.error("load file {} error", file, e);
return false;
} catch (NumberFormatException e) {
log.error("load file {} error", file, e);
}
}
}
return true;
}
//org.apache.rocketmq.store.index.IndexFile.load()
public void load() {
this.indexHeader.load();
}
//org.apache.rocketmq.store.index.IndexHeader.load()
public void load() {
this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));//獲取indexfile的前8位元組,即起始時間戳
this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));//結束時間戳 8位元組
this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));//在commitlog的起始offset 8位元組
this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));//在commitlog的結束offset 8位元組
this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));//已佔用的slot數量 4位元組
this.indexCount.set(byteBuffer.getInt(indexCountIndex));//已經使用的index數量 4位元組
if (this.indexCount.get() <= 0) {
this.indexCount.set(1);
}
}
```
indexfile檔案的格式如下
![20191112135445790](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220123308.png)
最終每個indexfile被載入到org.apache.rocketmq.store.index.IndexService.indexFileList集合儲存,IndexService又包含在DefaultMessageStore。indexfile和上述兩個物件有區別,它只是包裝了MappedFile物件,而Commitlog ConsumeQueue物件都是包裝了MappedFileQueue物件,包裝了MappedFile集合
綜上,broker啟動的時候載入commitlog consumequeue indexfile到broker,關係如圖
![20191112135139917 (1)](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220123335.png)
### 3.4.broker啟動恢復consumequeu 和commitlog
分為broker上次是正常關閉、異常關閉兩種情況
```java
//org.apache.rocketmq.store.DefaultMessageStore.recover(boolean)
private void recover(final boolean lastExitOK) {
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();//恢復consumerqueue
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);//broker上次是正常關閉
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);//broker上次是異常關閉
}
this.recoverTopicQueueTable();
}
```
#### 3.4.1.恢復consumerqueue
```java
/*
* 恢復消費佇列,返回所有消費佇列內的最大offset,即該offset就是commitlog中已經轉儲到消費佇列的offset
*/
//org.apache.rocketmq.store.DefaultMessageStore.recoverConsumeQueue()
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap maps : this.consumeQueueTable.values()) {//遍歷同topic下的所有ConsumeQueue集合
for (ConsumeQueue logic : maps.values()) {//遍歷同queueID下的所有ConsumeQueue
logic.recover();//恢復消費佇列
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;//返回所有訊息佇列檔案內訊息在commitlog中的最大偏移量
}
//org.apache.rocketmq.store.ConsumeQueue.recover()
public void recover() {
final List mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 3;//最多恢復三個
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;//20位元組
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//共用同一個緩衝區,但是position各自獨立
long processOffset = mappedFile.getFileFromOffset();//佇列檔名
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {//消費佇列儲存單元是一個20位元組定長資料,commitlog offset(8) + size(4) + message tag hashcode(8),commitlog offset是指這條訊息在commitlog檔案實際偏移量,size指訊息大小,訊息tag的雜湊值,用於校驗
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();//先讀取8位元組,即commitlog offset
int size = byteBuffer.getInt();//再讀取4位元組,即msg size
long tagsCode = byteBuffer.getLong();//再讀取8位元組,即message tag hashcode
if (offset > = 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;//consumequeue上當前訊息末尾位置,該值為20*N,其中N是表示當前訊息在consumequeue上是第幾個訊息
this.maxPhysicOffset = offset;//佇列內訊息在commitlog中的偏移量,this.maxPhysicOffset最終為該佇列下的consumequeue檔案內的訊息在commitlog的最大物理偏移量,即在commitlog的位置,該值也就是commitlog轉儲到consumequeue的位置,該位置後的訊息就需要轉儲到consumequeue
if (isExtAddr(tagsCode)) {//用於擴充套件的consumequeue,忽略,預設是不開啟,生產通常也不開啟,沒有研究過這個
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
if (mappedFileOffset == mappedFileSizeLogics) {//達到consumequeue檔案末尾
index++;
if (index > = mappedFiles.size()) {//遍歷到該佇列下是最後一個consumequeue檔案則退出迴圈
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);//獲取下一個mappedFile物件
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();//重置processOffset為mappedFile檔名
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
processOffset += mappedFileOffset;//processOffset
this.mappedFileQueue.setFlushedWhere(processOffset);//設定重新整理位置
this.mappedFileQueue.setCommittedWhere(processOffset);//設定提交位置
this.mappedFileQueue.truncateDirtyFiles(processOffset);//清理大於指定offset的髒檔案
if (isExtReadEnable()) {//忽略,生產也不開啟擴充套件consumequeue
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
```
#### 3.4.2.恢復commitlog
recoverConsumeQueue返回了已經刷盤到consumequeue的commitlog offset,
3.4.2.1. 上次broker是正常關閉
org.apache.rocketmq.store.CommitLog.recoverNormally(long)大體邏輯和recoverConsumeQueue是相同的,不同之處是consumequeue每條記錄是固定20位元組,而commitlog內每條記錄即一條訊息,是變長的,commitlog格式如圖(該圖是網上截圖得來,具體格式可以在org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, MessageExtBrokerInner)方法內檢視)
![20191113112228176 (1)](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220123509.png)
recoverNormally操作也是最多恢復最新的三個commitlog檔案,每次讀取commitlog中一條訊息,最終獲取到commitlog物件已經刷盤的位置processOffset,並設定Commitlog物件的MappedFileQueue的刷盤flushedWhere和提交點committedWhere位置為processOffset,清除髒commitlog檔案。這個步驟邏輯級別和recoverConsumeQueue相同,不同之處是如果processOffset<=maxPhyOffsetOfConsumeQueue(該值是recoverConsumeQueue操作返回的commitlog已經轉儲到consumequeue的最大commitlog offset)的時候,需要執行org.apache.rocketmq.store.DefaultMessageStore.truncateDirtyLogicFiles(long)
```java
//org.apache.rocketmq.store.DefaultMessageStore.truncateDirtyLogicFiles(long)
public void truncateDirtyLogicFiles(long phyOffset) {
ConcurrentMap> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentMap maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
logic.truncateDirtyLogicFiles(phyOffset);
}
}
}
//org.apache.rocketmq.store.ConsumeQueue.truncateDirtyLogicFiles(long)
public void truncateDirtyLogicFiles(long phyOffet) {
int logicFileSize = this.mappedFileSize;//600w
this.maxPhysicOffset = phyOffet - 1;
long maxExtAddr = 1;
while (true) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//獲取consumequeue的最後一個MappedFile,即檔名最大的那個MappedFile
if (mappedFile != null) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
mappedFile.setWrotePosition(0);
mappedFile.setCommittedPosition(0);
mappedFile.setFlushedPosition(0);
for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();//commitlog offset 8位元組
int size = byteBuffer.getInt();//msg size 4位元組
long tagsCode = byteBuffer.getLong();//tag hashcode 8位元組
if (0 == i) {//第一條訊息
if (offset > = phyOffet) {
this.mappedFileQueue.deleteLastMappedFile();//從MappedFileQueue.mappedFiles移除最後的MappedFile,並釋放該MappedFile
break;//跳出for迴圈,繼續執行while迴圈,獲取刪除當前MappedFile後的mappedFileQueue的最後一個檔案繼續重複執行該步驟
} else {
int pos = i + CQ_STORE_UNIT_SIZE;
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {//預設不使用擴充套件consumequeue.忽略
maxExtAddr = tagsCode;
}
}
} else {
if (offset >= 0 && size > 0) {
if (offset >= phyOffet) {//MappedFile的非第一條訊息offset>= phyOffet,說明當前的MappedFile既有
return;
}
int pos = i + CQ_STORE_UNIT_SIZE;
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
if (isExtAddr(tagsCode)) {//預設不使用擴充套件consumequeue.忽略
maxExtAddr = tagsCode;
}
if (pos == logicFileSize) {//達到當前MappedFile末尾,則結束
return;
}
} else {
return;
}
}
}
} else {
break;//mappedFile==null 退出while迴圈
}
}
if (isExtReadEnable()) {//預設不使用擴充套件consumequeue.忽略
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
```
**DefaultMessageStore.truncateDirtyLogicFiles(long)方法,什麼情況下會進入?暫時沒搞清楚**
**3.4.2.2. 上次broker是異常關閉**
org.apache.rocketmq.store.CommitLog.recoverAbnormally(long)跟recoverNormally正常關閉恢復commitlog基本相同,程式碼加了註釋,看如下程式碼
```java
//org.apache.rocketmq.store.CommitLog.recoverAbnormally(long)
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();//true 預設校驗crc
final List mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {//從commitlog檔案倒序遍歷
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {//如果該commitlog的第一天訊息儲存時間戳<=刷盤檢測點的刷盤時間,則說明該commitlog是需要恢復的,否則就繼續遍歷前一個commitlog
log.info("recover from this mapped file " + mappedFile.getFileName());//recover from this mapped file
break;
}
}
//剩餘程式碼邏輯等同recoverNormally
}
// Commitlog case files are deleted
else {//如果不存在commitlog檔案,比如過期被清除了or誤刪了
this.mappedFileQueue.setFlushedWhere(0);//設定mappedFileQueue的刷盤位置為0
this.mappedFileQueue.setCommittedWhere(0);//設定mappedFileQueue的提交位置為0
this.defaultMessageStore.destroyLogics();//刪除所有的consumequeue檔案
}
//org.apache.rocketmq.store.CommitLog.isMappedFileMatchedRecover(MappedFile)
/*
* 讀取該commitlog的第一條訊息的儲存時間戳,如果時間戳<=Math.min(刷盤檢測點commitlog最後刷盤時間, 刷盤檢測點consumequeue最終刷盤時間 ) - 3s,
* 則說明該commitlog檔案是需要進行恢復的
*/
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//commitlog組成 訊息大小(4) +魔術(4)+訊息報文體校驗碼(4)+佇列id(4)+flag(4)+當前訊息在佇列中的第幾個(8)+實體地址偏移量(8)+SYSFLAG(4)+producer時間戳(8)+producer地址(8)+訊息在broker儲存的時間(8)+訊息儲存到broker的地址(8)+訊息被重新消費了幾次(4)+prepard狀態的事務訊息(8)+bodylenght(4)+body(訊息體)+(1)+topicLength+(2)+propertiesLength
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);//獲取魔術daa320a7
if (magicCode != MESSAGE_MAGIC_CODE) {//魔數不對,返回false,說明該commitlog不符合規則
return false;
}
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);//獲取儲存到broker的timestamp
if (0 == storeTimestamp) {//儲存時間為0,說明沒有訊息,因為commitlog建立後預設填充就是0x00
return false;
}
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {//預設false
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {//執行這裡
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {//該commitlog第一條訊息儲存時間戳<=Math.min(刷盤檢測點commitlog最後刷盤時間, 刷盤檢測點consumequeue最終刷盤時間 ) - 3s
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}
```
## 4.broker執行中訊息寫入commitlog儲存
producer傳送訊息到broker,傳送命令是SEND_MESSAGE_V2,broker端對應的處理器是SendMessageProcessor,如下圖,是broker端收到訊息後的處理堆疊(紅框內)
![20191113143246545](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220123632.png)
核心方法是org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner),程式碼如下,註釋加的比較清楚,應該對於邏輯比較容易懂
```java
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());//設定訊息落地時間
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();//DefaultMessageStore.storeStatsService
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());//從訊息的SYSFlag判斷訊息型別
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {//事務訊息,非事務訊息
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {//如果訊息的延遲級別>0 說明訊息設定了延時
/*
* 如果訊息的延遲級別>0 將訊息的原topicName和原訊息佇列ID存入訊息屬性中,
* 用延遲訊息主題SCHEDULE_TOPIC_XXXX,訊息佇列ID更新原訊息的主題與佇列
* 對於producer傳送的延時訊息,消費端消費失敗重發的訊息都是有延時級別,都會被更改topic儲存到commitlog
*/
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 對於重發交易,它的延時級別都是大於0的,因此重發交易在消費失敗的時候傳送到broker儲存的時候都先被更高為重試topic,接著在儲存到commitlog的時候被儲存為schedule主題
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());//將訊息的原topicName和原訊息佇列ID存入訊息屬性中,
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
//Commit訊息 包括普通訊息, 重發訊息,延時訊息,事務訊息
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//獲取當前可以寫入的commitlog檔案,獲取一個MappedFile物件,記憶體對映的具體實現
//訊息寫入是同步操作
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config 加鎖 預設使用自旋鎖加鎖 即cas加鎖
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();//加鎖時間 當前時間
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);//設定訊息儲存時間戳
if (null == mappedFile || mappedFile.isFull()) {
/*
* 如果mappedFile==null 則需要新建mappedFile物件,即新建commitlog
* 如果mappedFile.isFull() 意思是commitlog檔案滿了,mappedFile.wrotePosition==1024^3,寫道了檔案末尾
* 建立commitlog檔案,建立mappedFile物件
*/
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);//訊息寫入commitlog檔案對應的pagecache
switch (result.getStatus()) {
case PUT_OK:
break;//寫入成功
case END_OF_FILE://文commitlog剩餘空間不足寫入該訊息,則新建commitlog檔案,通常建立commitlog是走該分支,mappedFile.isFull()的情況太罕見了,需要恰好一條訊息的長度剩餘commitlog的剩餘寫入空間
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);//新建commitlog檔案
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);//訊息寫入新commitlog檔案對應的pagecache
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();//cas解鎖
}
if (eclipseTimeInLock > 500) {//訊息寫入pagecache耗時超過500ms報警
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
//如果broker開啟了檔案預熱warmMapedFileEnable=true,則通過jna釋放commitlog建立時候pagecache鎖定的實體記憶體。生產通常是開啟檔案預熱的,避免日誌檔案在分配記憶體時缺頁中斷
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);//設定返回producer的結果
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, msg);//刷盤,吧訊息儲存到磁碟
handleHA(result, putMessageResult, msg);//ha同步訊息
return putMessageResult;
}
```
### 4.1.訊息寫入到commitlog
訊息寫入到commitlog,即追加到pagecache的方法,如下方法
```java
//org.apache.rocketmq.store.MappedFile.appendMessagesInner(MessageExt, AppendMessageCallback)
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {//cb=DefaultAppendMessageCallback
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();//當前MappedFile的寫位置
if (currentPos < this.fileSize) {//當前MappedFile的寫位置<檔案大小,說明檔案有剩餘位置可寫
/*
* 僅當transientStorePoolEnable 為true,刷盤策略為非同步刷盤(FlushDiskType為ASYNC_FLUSH),
* 並且broker為主節點時,才啟用堆外分配記憶體。此時:writeBuffer不為null
* Buffer與同步和非同步刷盤相關
* writeBuffer/mappedByteBuffer的position始終為0,而limit則始終等於capacity
* slice建立一個新的buffer, 是根據position和limit來生成byteBuffer,與原buf共享同一記憶體
* 開啟了transientStorePoolEnable,資料是寫入到堆外記憶體,即this.writeBuffer
*/
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();//重點
byteBuffer.position(currentPos);//設定寫位置
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {//根據訊息型別,是批量訊息還是單個訊息,進入相應的處理
//處理單個訊息, this.fileSize - currentPos是可寫的空間
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
//處理批量訊息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());//把MappedFile中的寫位置更新為寫了訊息之後的位置
this.storeTimestamp = result.getStoreTimestamp();//更新storeTimestamp為訊息儲存時間戳,即broker接收到訊息的時間戳
return result;
}
//寫滿會報錯,正常不會進入該程式碼,呼叫該方法前有判斷
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
```
以追加單個訊息為例說明org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, MessageExtBrokerInner),該方法就是按照commitlog格式吧訊息寫入到傳入的引數ByteBuffer內,如果是開啟了transientStorePoolEnable=true,則該ByteBuffer是MappedFile.writeBuffer堆外記憶體,未開啟則是MappedFile.mappedByteBuffer即pagecache。該pagecache是對應具體的commitlog磁碟檔案,該writeBuffer是對應什麼呢?在刷盤服務執行緒下會把writeBuffer內的資料寫入到pagecache,再由pagecache最終重新整理儲存到commitlog磁碟上。下面到刷盤時候自然清楚。因此**開啟transientStorePoolEnable=true的情況下是吧訊息寫入堆外記憶體即MappedFile.writeBuffer,未開啟情況下是寫入到pagecache即MappedFile.mappedByteBuffer。**
說到這裡,在commitlog剩餘空間不足以寫入該訊息,需要新建commitlog,那麼是如何實現的?
在org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)內的
mappedFile = this.mappedFileQueue.getLastMappedFile(0)
這行程式碼實現的
```java
//org.apache.rocketmq.store.MappedFileQueue.getLastMappedFile(long, boolean)
/*
* 功能:獲取最新的MappedFile物件
* 如果沒有或者上個commitlog/consumequeue已經滿了,則新建立一個commitlog/consumequeue檔案
* 對於建立commitlog來說,通常建立的原因是剩餘空間不足一寫入一條訊息,傳入引數startOffset==0 , needCreate=true
* 對於建立consumequeue來說,通常建立的原因是因為真滿了,因為consumequeue是固定長度,傳入引數startOffset==300w*20 , needCreate=true
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();//獲取最新的MappedFile
// 一個對映檔案都不存在 createOffset=0
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 已經存在了MappedFile檔案,且檔案滿了則建立
if (mappedFileLast != null && mappedFileLast.isFull()) {//mappedFileLast.isFull()對於commitlog來說是很罕見遇到的,因此建立commitlog通常不走這裡,但是由於consumequeue每個訊息是固定長度,每次是會寫滿通常走這裡
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;//createOffset是上個檔名+檔案size,用作新建的MappedFile檔名。公式就是createOffset=(N-1)*this.mappedFileSize 其中N為第幾個MappedFile
}
// 建立新的commitlog/consumequeue檔案物件MappedFile
if (createOffset != -1 && needCreate) {//對於commitlog建立來說,通常執行到這裡createOffset==0
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);//以createOffset作為commitlog or consumequeue的檔名
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 建立commitlog走這裡,因為開啟後this.allocateMappedFileService為AllocateMappedFileService,在broker啟動時候new Commitlog的時候賦值的
if (this.allocateMappedFileService != null) {//true
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);//由AllocateMappedFileService執行緒非同步建立mappedFile,並同步等待獲取建立結果
} else {//consumequeue檔案建立走這裡
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);//建立consumequeue對應的MappedFile
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);//第一個設定MappedFile.firstCreateInQueue=true
}
this.mappedFiles.add(mappedFile);//新增到MappedFileQueue集合
}
return mappedFile;//檔案滿了則返回新建的檔案
}
return mappedFileLast;//檔案未滿則返回最新的MappedFile
}
```
這裡有個難點是mappedFileLast.isFull()的判斷,因為對於consumequeue來說每條記錄是固定20位元組,寫位置是每次達到末尾(即檔案size)結束。但是對於commitlog來說,每條記錄(訊息)是變長的,比如要寫入的訊息長度是200位元組,但是commitlog只有70位元組空間,因此這樣情況是需要新建commitlog檔案,吧訊息寫入到新的commitlog內,但是舊的commitlog的寫位置MappedFile.wrotePosition怎麼就變成了MappedFile.fileSize呢?因為MappedFile.wrotePosition變成MappedFile.fileSize是很恰好發生的事情,加入待寫入的訊息長度和commitlog剩餘空間恰好相等,則說明正好容納,這種很罕見的情況,在寫完後wrotePosition==fileSize,但是實際也不會發生,因為如果寫入,MappedFile.wrotePosition最大隻能達到MappedFile.fileSize-8,因為commitlog預留末尾8位元組作為commitlog結束。那麼是如何使剩餘空間不足夠寫入訊息而讓wrotePosition==fileSize呢?經過分析得知在寫入訊息方法org.apache.rocketmq.store.MappedFile.appendMessagesInner(MessageExt, AppendMessageCallback)內,追加訊息到commitlog後,執行this.wrotePosition.addAndGet(result.getWroteBytes());,這裡result.getWroteBytes()是AppendMessageResult.wroteBytes,該值是在AppendMessageResult構造器賦值,在org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, MessageExtBrokerInner)內END_OF_FILE這行,如果訊息長度+8位元組(commitlog檔案結束的保留位元組)>commitlog剩餘空間maxBlank,則構造AppendMessageResult物件,AppendMessageResult.wroteBytes=maxBlank,而maxBlank==MappedFile.fileSize-MappedFile.wrotePosition,那麼在commitlog剩餘空間不足寫入該條訊息的情況下,在返回AppendMessageResult物件END_OF_FILE錯誤後,寫位置變為MappedFile.wrotePosition+maxBlank=MappedFile.wrotePosition+MappedFile.fileSize-MappedFile.wrotePosition=MappedFile.fileSize,即在剩餘空間不足寫入該條訊息的時候,寫位置會被更新為檔案大小,表示檔案滿了,需要建立檔案。
### 4.2.訊息刷盤
訊息寫入到pagechace/堆外記憶體後,執行刷盤handleDiskFlush(訊息儲存到磁碟),分為同步刷盤和非同步刷盤,入口方法如下
```java
//org.apache.rocketmq.store.CommitLog.handleDiskFlush(AppendMessageResult, PutMessageResult, MessageExt)
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
//同步刷寫,這裡有兩種配置,是否一定要收到儲存MSG資訊,才返回,預設為true
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//預設是true, 程式碼@1
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());//程式碼@2 result.getWroteOffset() + result.getWroteBytes()即為MappedFile寫入訊息後的MappedFile.wrotePosition+MappedFile.fileFromOffset
service.putRequest(request);//程式碼@3
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());程式碼@4
if (!flushOK) {//即GroupCommitRequest.flushOK為false,表示刷盤超時,那麼設定返回結果為超時
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {//程式碼@5 通常不走該分支,既然是同步刷盤,那麼為了保證100%的訊息不丟失,只能是訊息寫入到磁碟後才不會丟失,那麼就只能同步等待訊息寫入到磁碟後才能返回。
service.wakeup();//喚醒同步刷盤執行緒進行刷屏。
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//未開啟transientStorePoolEnable=true且非同步刷盤
flushCommitLogService.wakeup();//程式碼@6 非同步刷盤
} else {//開啟了transientStorePoolEnable=true且非同步刷盤
commitLogService.wakeup();//程式碼@7 喚醒把訊息寫入pagecache的執行緒
}
}
}
```
這裡解釋下為什麼程式碼@1處預設是true,因為通常producer建立訊息採用的構造器是
```java
//producer構造Message通常使用下面三個構造器
Message(String topic, byte[] body)
Message(String topic, String tags, byte[] body)
Message(String topic, String tags, String keys, byte[] body)
//對於這三個構造器,均設定 this.setWaitStoreMsgOK(true),即屬性PROPERTY_WAIT_STORE_MSG_OK均設定為true
//基本不使用的構造器
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)
//該構造器可以指定PROPERTY_WAIT_STORE_MSG_OK屬性為false
```
因此預設通常PROPERTY_WAIT_STORE_MSG_OK是為true的,表示的含義是等待收到訊息儲存成功。可否使用第四個構造器設定waitStoreMsgOK=false呢?也是可以的,但是這樣的會在handleDiskFlush內同步刷盤就走到了程式碼@5分支,這就無法保證同步刷盤的100%訊息落地,而且這樣的效率不如非同步刷盤,為什麼不使用非同步刷盤呢,因此程式碼@5分支實際沒人使用。
this.flushCommitLogServic是個刷盤服務類FlushCommitLogService的具體子類,是個runnable物件,根據不同的刷盤方式賦值為不同的物件,同步刷盤為GroupCommitService,非同步刷盤為FlushRealTimeService,在broker啟動中org.apache.rocketmq.store.CommitLog.start()啟動該服務執行緒的,呼叫關係如下
![20191113230702463](https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220123905.png)
#### 4.2.1.同步刷盤
在handleDiskFlush內程式碼解釋
程式碼@2,result.getWroteOffset() + result.getWroteBytes()即為MappedFile寫入訊息後的MappedFile.wrotePosition+MappedFile.fileFromOffset,即為整個commitlog物件(也為MappedFileQueue,因為代表MappedFile集合)上的位置,MappedFileQueue.flushedWhere到該位置之間的資料就是需要重新整理到磁碟的。
程式碼@3,吧GroupCommitRequest儲存到GroupCommitService.requestsWrite集合,並喚醒阻塞的GroupCommitService服務執行緒(同步刷盤執行緒)
```java
public synchronized void putRequest(final GroupCommitRequest request) {//handleDiskFlush()操作執行
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify 喚醒阻塞的GroupCommitService服務執行緒
}
}
```
程式碼@4,就是同步等待資料落盤,阻塞操作。
具體同步刷盤方式是GroupCommitService執行緒不停的輪詢,並提交
```java
//org.apache.rocketmq.store.CommitLog.GroupCommitService.run()
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
//省略其他程式碼
}
```
接著看GroupCommitService.waitForRunning(long)方法
```java
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
@Override
protected void onWaitEnd() {
this.swapRequests();
}
private void swapRequests() {//交換兩個集合資料,這樣在handleDiskFlush新增到requestsWrite的GroupCommitRequest物件就被交換到了requestsRead
List tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
```
在同步刷盤服務執行緒不停輪詢的過程中,每次把handleDiskFlush內新增到requestsWrite的GroupCommitRequest物件交換到了requestsRead,這個設計不錯,進行了讀寫分離,但是為什麼要設計的這麼麻煩呢?直接在handleDiskFlush內把GroupCommitRequest物件提交到GroupCommitService的一個阻塞佇列不行麼?難道是效率什麼的考慮,
接著執行刷盤操作doCommit,程式碼如下,註釋很清楚了,就不寫說明了。
```java
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {//this.requestsRead雖然是集合,但是實際有且只會有一個元素,定為集合是為了考慮將來擴充套件吧
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {//迴圈第一次,flushOK為false,那麼進行刷盤,接著迴圈第二次的時候,flushOK被置為true,退出for迴圈
//如果commitlog上次刷盤點MappedFileQueue.flushedWhere>=本次待刷盤位置,則不進行刷盤,接著退出for迴圈
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();//for迴圈第一次為false,第二次為true
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);//執行刷盤操作,把訊息從pagecache寫入到磁碟儲存
}
}
req.wakeupCustomer(flushOK);//設定GroupCommitRequest.flushOK為true,表示刷盤成功,該值在handleDiskFlush內用作判斷在等待時間內刷盤是否成功。喚醒使用者執行緒,即喚醒handleDiskFlush的程式碼@3
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//每次刷盤後把刷盤時間戳儲存到StoreCheckpoint.physicMsgTimestamp,以供broker異常關閉後啟動恢復刷盤位置,這裡和broker啟動進行recover操作對應
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();//清空
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);//這裡對應著handleDiskFlush的程式碼@5,直接就刷盤
}
}
}
```
#### 4.2.2.非同步刷盤
4.2.2.1.非同步刷盤未開啟transientStorePoolEnable=true的情況
handleDiskFlush的程式碼@6 處是未開啟transientStorePoolEnable=true且非同步刷盤的情況,執行FlushRealTimeService.wakeup(),喚醒非同步刷盤服務執行緒FlushRealTimeService
非同步刷盤服務執行緒的執行
```java
//org.apache.rocketmq.store.CommitLog.FlushRealTimeService.run()
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();//false
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//非同步刷盤間隔時間500ms
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//預設作業系統提交頁數 4
//省略不重要程式碼
try {
//省略不重要程式碼
long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);//把資料重新整理到磁碟
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//每次刷盤後把刷盤時間戳儲存到StoreCheckpoint.physicMsgTimestamp,以供broker異常關閉後啟動恢復刷盤位置,這裡和broker啟動進行recover操作對應
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}//while end
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
//如果broker被關閉,則執行到這裡。由於是非同步刷盤,此時磁碟資料落後commitlog,那麼盡力吧commitlog資料重新整理到磁碟
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
//省略不重要程式碼
CommitLog.log.info(this.getServiceName() + " service end");
}
```
非同步刷盤較同步刷盤邏輯簡單,僅僅是刷盤而已,沒有那麼多的控制。
4.2.2.2.非同步刷盤開啟transientStorePoolEnable=true的情況
handleDiskFlush的程式碼@7 處是開啟transientStorePoolEnable=true且非同步刷盤的情況,執行CommitRealTimeService.wakeup(),喚醒非同步刷盤服務執行緒CommitRealTimeService,這個和FlushRealTimeService不同
具體執行看程式碼和其中註釋
```java
//org.apache.rocketmq.store.CommitLog.CommitRealTimeService.run()
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();//刷盤頻率 200ms
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();//預設作業系統頁 4
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();//最大提交資料間隔時間 200ms,設定該值可以提高broker效能
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;//如果上次提交資料的時間戳+commitDataThoroughInterval<=當前時間,說明訊息過少,不滿足4頁的提交,在達到最大提交時間後,強制不滿足4頁提交條件也提交資料到pagecache
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);//把資料由堆外記憶體MappedFile.writeBuffer提交到pagecache即MappedF