1. 程式人生 > >Kafka教程(三)---------------底層實現細節之broker

Kafka教程(三)---------------底層實現細節之broker

目錄

一、資料儲存

我們從上面可以看到,broker中儲存的是各個topic的各個partition中的資料

1.資料目錄

每個broker都是有自己 存訊息資料的目錄(參考第一節的配置)

比如下圖是某個broker的資料目錄,裡面是子目錄,子目錄名字為topicname-partition,儲存的內容就是這個topic的這個partition裡面的message資料
這裡寫圖片描述

2.資料檔案

這裡面存的是訊息資料,在這個partition內部,資料的進入是有序的,用一個線性的offset(偏移量)來標識每條資料的位置,相當於排隊的號碼牌,每條訊息(message)有一個它的offset,這個offset不是整個topic全域性的,是這個topic的這個partition裡面”號碼牌”,它可以唯一確定每條訊息在parition(分割槽)內

的位置。

Message(訊息資料)的結構如下

如果一直往一個log檔案中寫,那麼這個檔案會不斷膨脹,而且一個檔案也不利於管理。所以寫滿一個後,開啟新的檔案寫入。
這個“寫滿”的標準:log.segment.bytes,由這個引數控制。比如這個值是1G,那麼當log檔案大小達到1G之後,就會開啟一個新的log檔案去寫入資料,新進來的資料寫入新的log檔案中。
每一個log檔案就是一個segment(分段的概念),log檔案的命名是其起始offset
如下圖就有四個segment(四個log檔案)

除了有log檔案,還有index檔案。他們成對出現,index檔案是為了資料查詢。

3.資料查詢

大多數場景下,我們消費kafka中的資料都是順序讀取下來的。但存在一些場景是需要隨機讀取某個offset的訊息:比如kafka消費者(下一章講解)處理訊息時程式失敗,重新處理,需要重新從該partition的某個offset資料續讀資料。
那麼如何快速定位某個partition的某個offet的訊息資料:
步驟1:根據offset定位在哪一個log檔案
查詢檔案列表,根據log的檔名稱和offset的比較就可以定位到具體log檔案(二分查詢,非常快)。比如查詢這個partition的offet為368784的訊息,那麼一定是落在0000000368769.log檔案中的
步驟2

:根據此log檔案對應的索引檔案(.index檔案)進一步定位其物理位置
在topicname-partition目錄內部,有一系列資料檔案,是Log與index檔案對,如上圖
log檔案儲存的是訊息資料資訊,index是索引資訊。這樣儲存的目的就是為了方便查詢指定的offset的資料,index中儲存的是相對offsetposition
根據在index中查詢到該offset所在的位置,直接移動檔案指標即可訪問到資料。

例:我們查詢0000000368769
索引檔案是稀疏索引,沒有記錄每條message的索引,只記錄了一部分。現在要找368784,已經判斷出來落在0000000000368769.log檔案中了,此時的相對offset為368784-368769=15,在index檔案中查詢,發現落在13和17之間,那麼我們找到13對應的position3,也就是檔案的中的位置,直接定位到offet 368782 的資料,再往下就可以很快找到偏移量為368784的資料

注意:
相對offset:沒有在index中儲存絕對offset,而是儲存的相對offset,可以節省空間
position:表示該條Message在資料檔案中的絕對位置。只要開啟檔案並移動檔案指標到這個position就可以讀取對應的Message了

二、 資料快取

如果每次有訊息進入,就往log檔案中寫入,那麼就會造成大量的磁碟隨機寫入。磁碟效能低,上層應用效能也被拉低。所以引入資料快取。

1.快取的好處

通過快取是寫入到記憶體中,所以應用的寫入速度會非常快
其次,快取是統一寫入磁碟,可以合併很多訊息,可以達到順序寫入的效果,磁碟的順序寫效率非常高。磁碟順序讀寫和隨機讀寫的差別巨大:Sequence IO 600M/s Random IO 100k/s。所以需要將要寫入的資料先快取起來統一寫入,從而提升寫入效率

2.快取方案

快取可以由服務程序自己維護,也就是將資料快取到JVM內部。但這樣就需要建立Java物件,則會佔程序大量記憶體。快取存入磁碟後快取的Java物件需要刪除,又需要JVM進行垃圾回收,GC過程又影響效能
所以Kafka採取作業系統級別快取。資料快取不再佔用應用自身的資源,而是直接交由作業系統來完成。也就是PageCache。
而且如果Kafka服務重啟,程序中快取的資料會失效,而OS管理的PageCache依然可用

3.pageCache原理

作業系統將閒置的memory用作disk caching。當資料寫入時,作業系統將資料寫入pageCache,同時標記該page為dirty;當讀取資料時,先從pageCache中查詢,如果沒有查到(發生缺頁)則去磁碟中讀取,返回資料。
所以本質上pageCache就是儘量把空閒的記憶體用作磁碟快取。這個快取是作業系統級別的,現代OS基本上都支援PageCache

注意:目前筆者認為,這個資料交由作業系統層後,對於上層應用Kafka服務程序來說,就認為已經儲存了,至於作業系統層是否持久化到磁碟,對於kafka服務透明,不影響上層結果。

4.快取失效問題

由於資料在記憶體,那麼依然存在系統down機記憶體資料丟失的風險。
而對於上層Kafka程序來說,既然認為資料已經儲存了(commit),那麼如果系統快取失效就導致kafka的commit的資料丟失了。
所以就有引數可以控制log.flush.interval.messages和log.flush.interval.ms,也就是達到一定訊息條數或者一定時間,就強制寫入到磁碟。但Kafka官方並不建議通過Broker端的來強制寫盤,認為資料的可靠性應該通過Replica來保證(在後面將詳細講解,機器A down掉之後,commit的資料在機器B上是還有一份的)。強制Flush資料到磁碟會對整體效能產生影響

所以說kafka保證它存在於多個replica記憶體中,不保證被持久化到磁碟。

5.pageCache優化引數

可以通過調整系統引數/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio來調優效能。
髒頁率超過第一個指標會啟動pdflush開始Flush Dirty PageCache。
髒頁率超過第二個指標會阻塞所有的寫操作來進行Flush。
根據不同的業務需求可以適當的降低dirty_background_ratio和提高dirty_ratio。

三、 資料備份

在第一章就講到replication,比如建立時設定–replication-factor 2。

1.容錯方案

資料備份的意義就在於:在機器或者服務程序出現問題時,kafka叢集依然能對外提供服務。
比如上例中,現在partition0在broker1中,partition1在broker2中,partition2在broker3中; 如果broker2機器down掉,則變成如下:

Broker3上的partition1 backup就會自動頂上提供服務(嗯,奏四這麼智慧),那麼broker3就成為partition1的leader。

什麼是leader:也就是正在提供這個partition資料服務的節點
Replica:所有擁有這個partition資料的節點。其中一個作為leader向外提供服務,其他作為follower跟著leader的領導。Leader掛掉之後,follower會選舉一個出來作為新leader。
Isr:In-Sync Replicas(在同步中的replica)代表活著並且能跟上的replica(什麼叫能跟上,下一節介紹)

例如
比如有四臺機器,replica為2,partition6,則如下
Partition 0 leader 1 replica 1,2 isr 1,2
Partition 1 leader 2 replica 2,3 isr 2,3
Partition 3 leader 3 replica 3,4 isr 3,4
Partition 4 leader 4 replica 4,5 isr 4,5
Partition 5 leader 5 replica 5,6 isr 5,6
Partition 6 leader6 replica 6,1 isr 6,1

· 掛一臺機器
比如此時1號機器down了,broker會自己調整leader,進行leader的轉移
Partition 0 leader 2 replica 1,2 isr 2
Partition 1 leader 2 replica 2,3 isr 2,3
Partition 3 leader 3 replica 3,4 isr 3,4
Partition 4 leader 4 replica 4,5 isr 4,5
Partition 5 leader 5 replica 5,6 isr 5,6
Partition 6 leader 6 replica 6,1 isr 6

· 掛兩臺機器
在上面的基礎上
如果此時又2掛了,那麼partition0的資料就不可用了,叢集丟資料了
如果此時又6掛了,那麼partition6的資料就不可用了,叢集丟資料了
(資料是不可能轉移的)
但如果是3/4/5掛了,整個叢集還可以堅持,還能對外提供完整的6個partition的資料服務
比如5掛了
Partition 0 leader 2 replica 1,2 isr 2
Partition 1 leader 2 replica 2,3 isr 2,3
Partition 3 leader 3 replica 3,4 isr 3,4
Partition 4 leader 4 replica 4,5 isr 4
Partition 5 leader 6 replica 5,6 isr 6
Partition 6 leader 6 replica 6,1 isr 6

· 掛三臺機器
現在還剩2,3,4,6.
如果3掛了,還能對外提供完整服務。
如果其他節點掛了,叢集將丟資料。

2.資料同步過程

我們上面說了,leader對外提供服務,leader負責這個partition所有資料的讀寫操作。(leader是partition級別的,不是topic級別的)
其他replica節點叫做follower,那他們的首要任務就是需要同步leader的資料到自己這裡

當有新資料寫入到leader中時,其他follower會從leader那pull(拉取)資料,並且傳送ack(acknowledge)告訴leader自己同步到哪個offset了,如果能和leader保持一致,就可以認為它能跟上節奏。
例如上圖,follower1,follower3都會不斷pull資訊,傳送ack回覆leader,但follower2因為jvm在GC而導致程序僵死,可能就不會拉取新的資料,導致跟不上了,延遲條數太多(或者延遲時間太多),最終會被踢出ISR中。此時ISR中只有follower1和follower3。但follower2依然是replica的成員,所以它醒了之後還會默默pull資訊,如果能追趕上,則又可以加入到ISR中。

可以catch up(跟上節奏)的節點就是在同步中(ISR)的,跟不上(延遲時間過長或者延遲條數過多)的就會被踢出ISR中。
這裡寫圖片描述