1. 程式人生 > >RocketMQ是如何判斷flushOK,及4.2版本所出現的bug

RocketMQ是如何判斷flushOK,及4.2版本所出現的bug

RocketMQ

版本:rocketmq-4.2.0

bug所表現形式:在同步刷盤時,生產訊息,返回SendResult的SendStatus為FLUSH_DISK_TIMEOUT,而且是在傳送訊息總量大概mapedFileSizeCommitLog(預設配置1G)的時候出現,每次達到mapedFileSizeCommitLog大小左右的時候都會出現FLUSH_DISK_TIMEOUT。而其餘時間並沒有出現狀態,總是如此這顯然是有問題的。

RocketMQ是如何判斷flushOK?

刷盤:CommitLog.this.mappedFileQueue.flush(0)

原理:根據刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】和下次刷盤點【req.getNextOffset()】的比較來判斷是否成功刷入磁碟。由於一條訊息可能被兩個分片所儲存,故迴圈次數為2。

程式碼:

private void doCommit() {

            synchronized (this.requestsRead) {

                if (!this.requestsRead.isEmpty()) {

                    for (GroupCommitRequest req : this.requestsRead) {

                        // There may be a message in the next file, so a maximum of

                        // two times the flush

                        boolean flushOK = false;

                        for (int i = 0; i < 2 && !flushOK; i++) {

                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {

                                CommitLog.this.mappedFileQueue.flush(0);

                            }

                        }

                        req.wakeupCustomer(flushOK);

                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();

                    if (storeTimestamp > 0) {

                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

                    }

                    this.requestsRead.clear();

                } else {

                    // Because of individual messages is set to not sync flush, it

                    // will come to this process

                    CommitLog.this.mappedFileQueue.flush(0);

                }

            }

        }

先講正常流程:進入for迴圈,第一次:由於刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】小於下次刷盤點【req.getNextOffset()】,故flushOK為false,執行刷盤操作。第二次:由於刷盤成功,刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】等於下次刷盤點【req.getNextOffset()】,flushOK為true。結束迴圈。

bug出現了:若一條訊息儲存在兩個分片時,第一次flushOK為false,刷盤之後刷盤起始點【CommitLog.this.mappedFileQueue.getFlushedWhere()】還是小於下次刷盤點【req.getNextOffset()】,故開始第二次刷盤。而第二次刷盤成功,而這時迴圈卻結束了。可flushOK還是為false。

修改程式碼:在for迴圈外再加flushOK的判斷。