一個商業化訊息佇列的效能好壞,其檔案儲存機制設計是衡量一個訊息佇列服務技術水平和最關鍵指標之一。

開頭問題

kafka檔案結構和rocketMQ檔案結構是什麼樣子?特點是什麼?

一、目錄結構

Kafka

Kafka以partition為單元分片儲存訊息

Kafka部分名詞解釋如下:

  • Broker:訊息中介軟體處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集。
  • Topic:一類訊息
  • Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列。
  • Segment:partition物理上由多個segment組成
  • offset:每個partition都由一系列有序的、不可變的訊息組成,這些訊息被連續的追加到partition中。partition中的每個訊息都有一個連續的序列號叫做offset,用於partition唯一標識一條訊息.

partition(分片目錄)

為方便理解以單broker為例,假設建立一個broker建立的topic是kafka-topic-01,partition數量是3, 會形成以下目錄

#1、分割槽目錄檔案
drwxr-x--- 2 root root 4096 Jul 26 19:35 kafka-topic-01-0
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-1
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-2
  • 名稱topic+有序序號
  • 一個partition(目錄)中的資料被切分為多個大小相等的segment(段)資料檔案中
  • partition中訊息只能順序寫讀

segment(分段訊息)

分為三個檔案

  • 索引檔案.index
  • 日誌檔案.log
  • 時間戳索引檔案.timeindex
#2、分割槽目錄中的日誌資料檔案和日誌索引檔案
-rw-r----- 1 root root 512K Jul 24 19:51 00000000000000000000.index
-rw-r----- 1 root root 1.0G Jul 24 19:51 00000000000000000000.log
-rw-r----- 1 root root 768K Jul 24 19:51 00000000000000000000.timeindex
-rw-r----- 1 root root 512K Jul 24 20:03 00000000000022372103.index
-rw-r----- 1 root root 1.0G Jul 24 20:03 00000000000022372103.log
-rw-r----- 1 root root 768K Jul 24 20:03 00000000000022372103.timeindex
-rw-r----- 1 root root 512K Jul 24 20:15 00000000000044744987.index
-rw-r----- 1 root root 1.0G Jul 24 20:15 00000000000044744987.log
-rw-r----- 1 root root 767K Jul 24 20:15 00000000000044744987.timeindex
-rw-r----- 1 root root 10M Jul 24 20:21 00000000000067117761.index
-rw-r----- 1 root root 511M Jul 24 20:21 00000000000067117761.log
-rw-r----- 1 root root 10M Jul 24 20:21 00000000000067117761.timeindex
  • segment檔案以偏移量命名,數值最大64位long型別

segment內部-index檔案

  • 索引檔案採用稀疏索引(即有的訊息不能找到對應的索引),目的是節省儲存空間
  • 定長,佔8個位元組

訊息單元的儲存結構

欄位名

說明

relativeOffset(4)

相對偏移量,相對baseOffset來說

position(4)

實體地址,日誌檔案中的實體地址

如何查詢訊息

如offset的值是368772

1.根據offset找到所在的segment,根據二分查詢,找到訊息所在的log檔案0000000000000368769.log和索引檔案0000000000000368769.index

2.計算下差368772-368769=3,在索引檔案中也是二分查詢,定位到是<3,497>記錄,即對應的物理位置是497,從而找到訊息

3.根據物理位置497在0000000000000368769.log檔案找到訊息。

segment內部-timeIndex檔案

根據指定的時間戳查詢偏移量資訊

  • 檔名:以時間戳命名
  • 定長,12個位元組
  • 時間戳只能遞增,追加的時間戳小於之前的時間戳,不予新增

欄位名

說明

timestamp(8)

當前日誌分段最大時間戳

relativeOffset(4)

時間戳對應的相對偏移量

segment內部-log檔案

RocketMQ

rocketMQ把所有topic中的訊息都commitLog中

儲存的檔案主要分為:

  • commitlog: 儲存訊息實體
  • consumequeue: 按Topic和佇列儲存訊息的offset
  • index: index按key、tag、時間等儲存

commitlog(物理佇列)

檔案地址:${user.home} \store\${commitlog}${fileName}

  • 存放該broke所有topic的訊息
  • 預設1G大小
  • 以偏移量為檔名,當一個檔案寫滿時則建立新檔案,這樣的設計主要是方便根據訊息的物理偏移量,快速定位到訊息所在的物理檔案
  • 一個訊息儲存單元是不定長的
  • 順序寫但是隨機讀

consumeQueue(消費佇列)

檔案地址:${storeRoot}\consumequeue\${topicName}\${queueId}\${fileName}

  • 檔名:跟commitlog一樣以偏移量作為檔名
  • 按topic和queueId緯度分別儲存訊息commitLogOffset、size、tagHashCode
  • 一個儲存單元是20個位元組的定長的
  • 順序讀順序寫

訊息單元的儲存結構

欄位名

說明

offset(8)

commitlog的偏移量

size(4)

commitlog訊息大小

tagHashCode

tag的雜湊值

indexFile(索引檔案)

檔案地址:${user.home}\store\index\${fileName}

  • 以時間作為檔名
  • 一個儲存單元是20個位元組定長的
  • 一個indexFile最多儲存2000w條訊息

索引檔案(Index)提供訊息檢索的能力,主要在問題排查和資料統計等場景應用

如何查詢訊息

  1. 消費者順序讀取consumerQueue,獲取到物理offset,根據物理offset去commitlog檔案中隨機讀取訊息實體

二、如何儲存訊息消費進度

Kafka

方式一:zookeeper儲存

0.9之前老版本

消費者如果是根據javaapi來消費,也就是【kafka.javaapi.consumer.ConsumerConnector】,通過配置引數【zookeeper.connect】來消費。這種情況下,消費者的offset會更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目錄下,例如:

[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
5662
cZxid = 0x20006d28a
ctime = Wed Apr 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

儲存方式:

consumer在從broker讀取訊息後,可以選擇commit,該操作會在Zookeeper中存下該consumer在該partition下讀取的訊息的offset,該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之後的開始位置相同

方式二:broker儲存

broker 存放 offset 是 kafka 從 0.9 版本開始

儲存位置:

consumer 預設將 offset 持久化儲存在 Kafka 一個內建的 topic 中,該 topic 為__consumer_offsets。

提交offset分為:自動提交和手動提交

儲存方式:

消費者正常執行,除了持久化一份消費offset到broker中,還會在記憶體中儲存一份消費進度offset,所以當消費者都正常執行時__consumer_offsets使用的比較少。當消費者崩潰或者balance時,會從broker中拉取最後一次消費offset。

RocketMQ

方式一:叢集模式

叢集模式:topic中的一條訊息只會同一個消費者組中的一個消費者消費,不會被多個消費者消費

對offset的管理分為本地模式和遠端模式。本地模式是以文字檔案的形式儲存在客戶端,而遠端模式是將資料儲存到broker端,對應的資料結構分別為LocalFileOffsetStore和RemoteBrokerOffsetStore。

叢集模式使用的是遠端模式。

儲存位置:

ocketMQ的broker端中,offset的是以json的形式持久化到磁碟檔案中,檔案路徑為${user.home}/store/config/consumerOffset.json

{
"offsetTable": {
"topic-name@consumer-group": {
"0": 88526,
"1": 88528
}
}
}

儲存方式:

定時持久化到broker磁碟ConsumerOffset.json

consumer從broker拉取訊息後,Broker更新消費進度,僅僅是更新了記憶體中的offsetTable表,並沒有涉及到ConsumerOffset.json這個檔案。broker啟動時會啟動一個定時任務(預設5秒),來定時把消費offset持久化到磁碟consumerOffset.json,儲存的過程是先將原來的檔案存到ConsumerOffset.json.bak檔案中,然後將新的內容存入ConsumerOffset.json檔案

方式二:廣播模式

廣播模式:一條訊息會被每個消費者消費

當消費模式為廣播模式時,offset使用本地模式儲存,因為每條訊息會被所有的消費者消費,每個消費者管理自己的消費進度,各個消費者之間不存在消費進度的交集。

三、特點

Kafka

為什麼要設計成partition中多segment

  • 一個就是上面提到的如果使用單個 Partition 來管理資料,順序往 Partition 中累加寫勢必會造成單個 Partition 檔案過大,讀訊息是順序讀的(呼叫FileMessageSet的searchFor方法),檔案過大,查詢效率下降
  • 另一個原因是 Kafka 訊息記錄不是一直堆堆堆,預設是有日誌清除策略的。要麼是日誌超過設定的儲存時間觸發清理邏輯,要麼就是 Topic 日誌檔案超過閾值觸發清除邏輯,如果是一個大檔案刪除是要鎖檔案的這時候寫操作就不能進行。因此設定分段儲存對於清除策略來說也會變得更加簡單,只需刪除較早的日誌塊即可

清理資料功能-日誌清理

  1. 基於時間

日誌刪除任務會定時(預設5分鐘執行一次)檢查是否有保留時間超過設定閾值(預設儲存7天)可刪除的segment檔案。

  1. 基於日誌大小

日誌刪除任務會檢查當前日誌的大小是否超過設定的閾值retentionSize來尋找可刪除的日誌分段的檔案集合deletableSegments,參考下圖所示

基於日誌大小的保留策略與基於時間的保留策略類似,其首先計算日誌檔案的總大小size和retentionSize的差值diff,即計算需要刪除的日誌總大小,然後從日誌檔案中的第一個日誌分段開始進行查詢可刪除的日誌分段的檔案集合deletableSegments。查找出deletableSegments之後就執行刪除操作

基於日誌起始偏移量

該刪除策略具體是刪除某日誌分段的下一個日誌分段的baseOffset小於等於logStartOffset的部分。

壓縮資料

Producer 端壓縮、Broker 端保持、Consumer 端解壓縮

在Kafka中,壓縮可能發生在兩個地方:生產者端和Broker端。broker端儲存的也是壓縮的訊息,傳輸到consumer端再進行解壓縮

在吞吐量方面:LZ4 > Snappy > zstd / GZIP

RocketMQ

RocketMQ的CommitLog檔案採用混合型儲存

即所有的Topic下的訊息佇列共用同一個CommitLog的日誌資料檔案。感覺這樣會增加隨機讀的概率,可以學著kakfa按topic隔離。

預載入MappedFile檔案

訊息寫入時,每次都回去去mappedFileQueue中去拿mappedfile。而這個mappedfile是由後臺執行的AllocateMappedFileService服務執行緒去建立和預分配的。這樣下次獲取時候直接返回就可以不用等待MappedFile建立分配所產生的時間延遲

檔案預熱

我們拿到mmapedfile檔案,可能pagecache中還是出現頁資料不存在的情況,所以rocketmq增加了預熱

有一個warmMappedFile方法,它會把當前對映的檔案,每一頁遍歷多去,寫入一個 0 位元組,然後再呼叫mlock 和 madvise(MADV_WILLNEED)。

mlock:可以將程序使用的部分或者全部的地址空間鎖定在實體記憶體中,防止其被交換到 swap 空間。

madvise:給作業系統建議,說這檔案在不久的將來要訪問的,因此,提前讀幾頁可能是個好主意

四、讀寫方式

通過哪些I/O機制來訪問index和segment檔案呢?可以分為寫和讀兩塊:

Kafka

寫(生產)訊息:

  • index檔案較小,可以直接用mmap進行記憶體對映
  • segment檔案較大,可以採用普通的write(FileChannel.write),由於是順序寫PageCache,可以達到很高的效能

讀(消費)訊息:

  • index檔案仍然通過mmap讀,缺頁中斷的可能性較小
  • segment可以使用sendfile進行零拷貝的傳送給消費者,達到非常高的效能

RocketMQ

寫(生產)訊息:

  • CommitLog、ConsumerQueue都使用MMAP進行寫

讀(消費)訊息:

  • commitLog和consumerQueue檔案都是MMAP讀

五、儲存關鍵技術—Mmap、PageCache、sendfile

Mmap

普通讀檔案過程

大體流程如下:

  1. 程序使用系統呼叫向核心發起檔案讀取請求,此時會有使用者態轉為核心態的過程。
  2. 核心訪問檔案系統。
  1. 如果有 cache 直接返回資料,沒有開始讀取磁碟
  2. 讀取成功將 page1 讀取到 cache 中完成第一次 copy
  1. 通知核心讀取完畢(不同IO模型實現不同)
  2. 將資料從位於核心空間的 cache 拷貝到程序空間,完成第二次拷貝。

這裡簡單說一下為啥要拷貝到程序中:程序之間是相互隔離的,而且在常規操作下程序無法訪問核心資料,所以得將 cache 拷貝到程序當中,給程序使用。

  • 耗時主要集中在核心切換、copy時長

Mmap對映

沒有資料拷貝,對映的是資料地址

mmap 把檔案對映到使用者空間裡的虛擬記憶體,省去了從核心緩衝區複製到使用者空間的過程,檔案中的位置在虛擬記憶體中有了對應的地址,可以像操作記憶體一樣操作這個檔案,相當於已經把整個檔案放入記憶體。mmap 在完成了 read、write 相同效果的同時不僅省去了核心到程序的記憶體拷貝過程,而且還可以實現資料的共享操作:一個檔案可以同時被多個程序、核心對映,如果對映的檔案被核心或其他程序修改,那麼最終的結果也會反映到對映當中。

  • 對映的虛擬記憶體可以被多個程序讀寫
  • 少了每次核心態pageCache到程序私有記憶體的拷貝
  • 可以簡單看成直接操作pageCache

Mmap的限制

  • 對映檔案的大小有限制,一般最大1.5G~2G
  • MMAP 使用的是虛擬記憶體,和 PageCache 一樣是由作業系統來控制刷盤的,雖然可以通過 force() 來手動控制,小記憶體場景不適合。

OS的PageCache機制

PageCache是OS對檔案的快取,用於加速對檔案的讀寫。一般來說,程式對檔案進行順序讀寫的速度幾乎接近於記憶體的讀寫訪問,這裡的主要原因就是在於OS使用PageCache機制對讀寫訪問操作進行了效能優化,將一部分的記憶體用作PageCache

一、檔案讀取

如果一次讀取檔案時出現未命中(cache miss)PageCache的情況,OS從物理磁碟上訪問讀取檔案的同時,會順序對其他相鄰塊的資料檔案進行預讀取(ps:順序讀入緊隨其後的少數幾個頁面)。這樣,只要下次訪問的檔案已經被載入至PageCache時,讀取操作的速度基本等於訪問記憶體

二、檔案寫入

OS會先寫入至Cache內,隨後通過非同步的方式由pdflush核心執行緒將Cache內的資料刷盤至物理磁碟上。對於檔案的順序讀寫操作來說,讀和寫的區域都在OS的PageCache內,此時讀寫效能接近於記憶體。不是順序寫,當pageCache中發現漏頁,還是會去吧磁碟中資料拉到pageCache再寫

sendfile

FileChannel#tranferTo transferFrom實現零拷貝

kafka消費的時候使用了零拷貝的sendfile。pagecache資料不經過核心切換直接拷貝到socket buffer。傳統的資料傳送需要傳送4次上下文切換,採用sendfile系統呼叫之後,資料直接在核心態交換,系統上下文切換減少 為2次。根據測試結果,可以提高60%的資料傳送效能。

六、參考

https://t1mek1ller.github.io/2019/11/13/kafka-rocketmq-storage/

https://tech.meituan.com/2015/01/13/kafka-fs-design-theory.html

swap 空間: swap space是磁碟上的一塊區域,可以是一個分割槽,也可以是一個檔案,或者是他們的組合。當RAM滿了後,並且需要更多記憶體空間時,使用磁碟空間代替RAM空間