1. 程式人生 > >Kafka 如何做到 1 秒釋出百萬條訊息

Kafka 如何做到 1 秒釋出百萬條訊息

Kafka 是分散式釋出-訂閱訊息系統,是一個分散式的,可劃分的,冗餘備份的永續性的日誌服務。它主要用於處理活躍的流式資料。

現在被廣泛地應用於構建實時資料管道和流應用的場景中,具有橫向擴充套件,容錯,快等優點,並已經執行在眾多大中型公司的生產環境中,成功應用於大資料領域,本文分享一下我所瞭解的 Kafka。

Kafka 高吞吐率效能揭祕

Kafka 的第一個突出特定就是“快”,而且是那種變態的“快”,在普通廉價的虛擬機器器上,比如一般 SAS 盤做的虛擬機器上,據 LINDEDIN 統計,最新的資料是每天利用 Kafka 處理的訊息超過1萬億條,在峰值時每秒鐘會發布超過百萬條訊息,就算是在記憶體和 CPU 都不高的情況下,Kafka 的速度最高可以達到每秒十萬條資料,並且還能持久化儲存。

作為訊息佇列,要承接讀跟寫兩塊的功能,首先是寫,就是訊息日誌寫入 Kafka,那麼,Kafka 在“寫”上是怎麼做到寫變態快呢?

Kafka 讓程式碼飛起來之寫得快

首先,可以使用 Kafka 提供的生產端 API 釋出訊息到 1 個或多個 Topic(主題)的一個(保證資料的順序)或者多個分割槽(並行處理,但不一定保證資料順序)。Topic 可以簡單理解成一個數據類別,是用來區分不同資料的。

Kafka 維護一個 Topic 中的分割槽 log,以順序追加的方式向各個分割槽中寫入訊息,每個分割槽都是不可變的訊息佇列。分割槽中的訊息都是以 k-v 形式存在。

  • k 表示 offset,稱之為偏移量,一個 64 位整型的唯一標識,offset 代表了 Topic 分割槽中所有訊息流中該訊息的起始位元組位置。
  • v 就是實際的訊息內容,每個分割槽中的每個 offset 都是唯一存在的,所有分割槽的訊息都是一次寫入,在訊息未過期之前都可以調整 offset 來實現多次讀取。

 

以上提到 Kafka “快”的第一個因素:訊息順序寫入磁碟。

我們知道現在的磁碟大多數都還是機械結構(SSD不在討論的範圍內),如果將訊息以隨機寫的方式存入磁碟,就會按柱面、磁頭、扇區的方式進行(定址過程),緩慢的機械運動(相對記憶體)會消耗大量時間,導致磁碟的寫入速度只能達到記憶體寫入速度的幾百萬分之一,為了規避隨機寫帶來的時間消耗,KAFKA採取順序寫的方式儲存資料,如下圖所示:

 

 

 

 

 

新來的訊息只能追加到已有訊息的末尾,並且已經生產的訊息不支援隨機刪除以及隨機訪問,但是消費者可以通過重置 offset 的方式來訪問已經消費過的資料。

即使順序讀寫,過於頻繁的大量小 I/O 操作一樣會造成磁碟的瓶頸,所以 Kafka 在此處的處理是把這些訊息集合在一起批量傳送,這樣減少對磁碟 IO 的過度讀寫,而不是一次傳送單個訊息。

另一個是無效率的位元組複製,尤其是在負載比較高的情況下影響是顯著的。為了避免這種情況,Kafka 採用由 Producer,broker 和 consumer 共享的標準化二進位制訊息格式,這樣資料塊就可以在它們之間自由傳輸,無需轉換,降低了位元組複製的成本開銷。

同時,Kafka 採用了 MMAP(Memory Mapped Files,記憶體對映檔案)技術。很多現代作業系統都大量使用主存做磁碟快取,一個現代作業系統可以將記憶體中的所有剩餘空間用作磁碟快取,而當記憶體回收的時候幾乎沒有效能損失。

由於 Kafka 是基於 JVM 的,並且任何與 Java 記憶體使用打過交道的人都知道兩件事:

▪ 物件的記憶體開銷非常高,通常是實際要儲存資料大小的兩倍;

▪ 隨著資料的增加,java的垃圾收集也會越來越頻繁並且緩慢。

基於此,使用檔案系統,同時依賴頁面快取就比使用其他資料結構和維護記憶體快取更有吸引力:

▪ 不使用程序內快取,就騰出了記憶體空間,可以用來存放頁面快取的空間幾乎可以翻倍。

▪ 如果 Kafka 重啟,進行內快取就會丟失,但是使用作業系統的頁面快取依然可以繼續使用。

可能有人會問 Kafka 如此頻繁利用頁面快取,如果記憶體大小不夠了怎麼辦?

Kafka 會將資料寫入到持久化日誌中而不是重新整理到磁碟。實際上它只是轉移到了核心的頁面快取。

利用檔案系統並且依靠頁快取比維護一個記憶體快取或者其他結構要好,它可以直接利用作業系統的頁快取來實現檔案到實體記憶體的直接對映。完成對映之後對實體記憶體的操作在適當時候會被同步到硬碟上。

Kafka 讓程式碼飛起來之讀得快

Kafka 除了接收資料時寫得快,另外一個特點就是推送資料時發得快。

Kafka 這種訊息佇列在生產端和消費端分別採取的 push 和 pull 的方式,也就是你生產端可以認為 Kafka 是個無底洞,有多少資料可以使勁往裡面推送,消費端則是根據自己的消費能力,需要多少資料,你自己過來 Kafka 這裡拉取,Kafka 能保證只要這裡有資料,消費端需要多少,都儘可以自己過來拿。

▲零拷貝

具體到訊息的落地儲存,broker 維護的訊息日誌本身就是檔案的目錄,每個檔案都是二進位制儲存,生產者和消費者使用相同的格式來處理。維護這個公共的格式並允許優化最重要的操作:網路傳輸永續性日誌塊。 現代的 unix 作業系統提供一個優化的程式碼路徑,用於將資料從頁快取傳輸到 socket;在 Linux 中,是通過 sendfile 系統呼叫來完成的。Java 提供了訪問這個系統呼叫的方法:FileChannel.transferTo API。

要理解 senfile 的影響,重要的是要了解將資料從檔案傳輸到 socket 的公共資料路徑,如下圖所示,資料從磁碟傳輸到 socket 要經過以下幾個步驟:

▪ 作業系統將資料從磁碟讀入到核心空間的頁快取

▪ 應用程式將資料從核心空間讀入到使用者空間快取中

▪ 應用程式將資料寫回到核心空間到 socket 快取中

▪ 作業系統將資料從 socket 緩衝區複製到網絡卡緩衝區,以便將資料經網路發出

這裡有四次拷貝,兩次系統呼叫,這是非常低效的做法。如果使用 sendfile,只需要一次拷貝就行:允許作業系統將資料直接從頁快取傳送到網路上。所以在這個優化的路徑中,只有最後一步將資料拷貝到網絡卡快取中是需要的。

常規檔案傳輸和 zeroCopy 方式的效能對比:

 

 

 

 

假設一個 Topic 有多個消費者的情況, 並使用上面的零拷貝優化,資料被複制到頁快取中一次,並在每個消費上重複使用,而不是儲存在儲存器中,也不在每次讀取時複製到使用者空間。這使得以接近網路連線限制的速度消費訊息。

這種頁快取和 sendfile 組合,意味著 Kafka 叢集的消費者大多數都完全從快取消費訊息,而磁碟沒有任何讀取活動。

▲批量壓縮

在很多情況下,系統的瓶頸不是 CPU 或磁碟,而是網路頻寬,對於需要在廣域網上的資料中心之間傳送訊息的資料流水線尤其如此。所以資料壓縮就很重要。可以每個訊息都壓縮,但是壓縮率相對很低。所以 Kafka 使用了批量壓縮,即將多個訊息一起壓縮而不是單個訊息壓縮。

Kafka 允許使用遞迴的訊息集合,批量的訊息可以通過壓縮的形式傳輸並且在日誌中也可以保持壓縮格式,直到被消費者解壓縮。

Kafka 支援 Gzip 和 Snappy 壓縮協議。

Kafka 資料可靠性深度解讀

 

 

 

 

 

Kafka 的訊息儲存在 Topic 中,Topic 可分為多個分割槽,為保證資料的安全性,每個分割槽又有多個 Replia。

▪ 多分割槽的設計的特點:

  1. 為了併發讀寫,加快讀寫速度;
  2. 是利用多分割槽的儲存,利於資料的均衡;
  3. 是為了加快資料的恢復速率,一但某臺機器掛了,整個叢集只需要恢復一部分資料,可加快故障恢復的時間。

每個 Partition 分為多個 Segment,每個 Segment 有 .log 和 .index 兩個檔案,每個 log 檔案承載具體的資料,每條訊息都有一個遞增的 offset,Index 檔案是對 log 檔案的索引,Consumer 查詢 offset 時使用的是二分法根據檔名去定位到哪個 Segment,然後解析 msg,匹配到對應的 offset 的 msg。

<Partition recovery過程>

每個 Partition 會在磁碟記錄一個 RecoveryPoint,,記錄已經 flush 到磁碟的最大 offset。當 broker 失敗重啟時,會進行 loadLogs。首先會讀取該 Partition 的 RecoveryPoint,找到包含 RecoveryPoint 的 segment 及以後的 segment, 這些 segment 就是可能沒有完全 flush 到磁碟 segments。然後呼叫 segment 的 recover,重新讀取各個 segment 的 msg,並重建索引。每次重啟 Kafka 的 broker 時,都可以在輸出的日誌看到重建各個索引的過程。

< 資料同步>

Producer 和 Consumer 都只與 Leader 互動,每個 Follower 從 Leader 拉取資料進行同步。

 

 

 

 

如上圖所示,ISR 是所有不落後的 replica 集合,不落後有兩層含義:距離上次 FetchRequest 的時間不大於某一個值或落後的訊息數不大於某一個值,Leader失 敗後會從 ISR 中隨機選取一個 Follower 做 Leader,該過程對使用者是透明的。

當 Producer 向 Broker 傳送資料時,可以通過 request.required.acks 引數設定資料可靠性的級別。

此配置是表明當一次 Producer 請求被認為完成時的確認值。特別是,多少個其他 brokers 必須已經提交了資料到它們的 log 並且向它們的 Leader 確認了這些資訊。

▪典型的值:

0: 表示 Producer 從來不等待來自 broker 的確認資訊。這個選擇提供了最小的時延但同時風險最大(因為當server宕機時,資料將會丟失)。

1:表示獲得 Leader replica 已經接收了資料的確認資訊。這個選擇時延較小同時確保了 server 確認接收成功。

-1:Producer 會獲得所有同步 replicas 都收到資料的確認。同時時延最大,然而,這種方式並沒有完全消除丟失訊息的風險,因為同步 replicas 的數量可能是 1。如果你想確保某些 replicas 接收到資料,那麼你應該在 Topic-level 設定中選項 min.insync.replicas 設定一下。

僅設定 acks= -1 也不能保證資料不丟失,當 ISR 列表中只有 Leader 時,同樣有可能造成資料丟失。要保證資料不丟除了設定 acks=-1,還要保證 ISR 的大小大於等於2。

▪具體引數設定:

request.required.acks:設定為 -1 等待所有 ISR 列表中的 Replica 接收到訊息後採算寫成功。

min.insync.replicas:設定為 >=2,保證 ISR 中至少兩個 Replica。

Producer:要在吞吐率和資料可靠性之間做一個權衡。

Kafka 作為現代訊息中介軟體中的佼佼者,以其速度和高可靠性贏得了廣大市場和使用者青睞,其中的很多設計理念都是非常值得我們學習的,本文所介紹的也只是冰山一角,希望能夠對大家瞭解 Kafka 有一定的作用。

歡迎工作一到五年的Java工程師朋友們加入Java程式設計師開發: 854393687
群內提供免費的Java架構學習資料(裡面有高可用、高併發、高效能及分散式、Jvm效能調優、Spring原始碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!