1. 程式人生 > >《Kafka官方文件》設計

《Kafka官方文件》設計

原文連結 譯者:BlackMamba

Design

1. Motivation

我們設計Kafka用來作為統一的平臺來處理大公司可能擁有的所有實時資料來源。為了做到這點,我們必須思考大量的使用場景。

它必須有高吞吐去支援大資料流,例如實時日誌聚合。

它必須優雅的處理資料積壓,以支援定期從離線系統載入資料。

這也以為這系統必須支援低延遲的分發來處理傳統訊息系統的場景。

我們想支援分割槽的、分散式的、實時的處理資料來源並建立新的資料來源,這推動了我們的分割槽和消費模型。

最後,將流反饋到其他系統進行服務的情況下,我們知道系統必須能夠保證容錯性,在部分機器故障的時候提供服務。

支援這些使用推動我們做了一些列特殊的元素設計,比起傳統的訊息系統更像是資料庫日誌。我們將在以下章節介紹一些設計要素。

2. Persistence

Don’t fear the filesystem!

Kafka強依賴檔案系統來儲存和快取訊息。“磁碟是緩慢的”是一個通常的認知,這是人們懷疑持久化的結構能否提供強大的效能。事實上,磁碟比人們想象的更慢也更快,這基於如何去使用它們;一個合適的設計可以使磁碟和網路一樣的快速。

影響磁碟效能的核心因素是磁碟驅動的吞吐和過去十年磁碟的查詢方式不同了。使用六個7200rpm的SATA RAID-5陣列的JBOD配置線性寫入能力為600MB/sec,而隨機寫的效能僅僅是100k/sec,相差了6000倍。線性寫入和讀取是最可預測的,並且被作業系統大量的優化。現代作業系統提供read-ahead和write-behind技術,他們大塊預讀資料,並將較小的羅機械合併成較大的物理寫入。在ACM Queue的文章中可以找到此問題相關的進一步討論;他們實際上發現順序訪問磁碟在某些情況下比隨機訪問記憶體還快。

為了彌補效能的差異,現代作業系統在使用主記憶體來做磁碟快取時變的越來越激進。當記憶體被回收時,現代作業系統將樂意將所有可用記憶體轉移到磁碟快取,而且效能會降低很多。所有的磁碟讀寫都需要通過這層快取。這個功能不會被輕易關閉,除非使用Direct IO,因此儘管在程序內快取了資料,這些資料也有可能在作業系統的pagecache中快取,從而被快取了兩次。

此外,我們建立在JVM之上,任何在Java記憶體上花費過時間的人都知道兩件事情:

物件的記憶體開銷非常大,通常將儲存的資料大小翻倍(或更多)。

Java的記憶體回收隨著堆內資料的增多變的越來越緩慢。

由於這些因素,使用檔案系統並依賴於pagecache要優於維護記憶體中快取或其他結構——我們至少可以通過直接訪問記憶體來是可用記憶體增加一倍,並通過儲存位元組碼而不是物件的方式來節約更多的記憶體。這樣做將可以在32G的機器上使用28-30GB的記憶體,而不需要承受GC的問題。此外,及時重啟服務,記憶體會保持有效,而程序內快取將需要重建(對於10G的資料可能需要10分鐘),否則需要從冷資料載入(可怕的初始化效能)。這也大大簡化了程式碼,因為保持快取和檔案之間的一致性是由作業系統負責的,這比程序中操作更不容易出錯。

這是一個簡單的設計:在程序內儘量緩衝資料,空間不足時將所有資料刷寫到磁碟,我們採用了相反的方式。資料並儘快寫入一個持久化的日誌而不需要立即刷到磁碟。實際上這只是意味著資料被轉移到了核心的pagecache。

(以pagecache為中心的設計風格)

Constant Time Suffices

在訊息系統中使用持久化資料通常是具有關聯的BTree或其他隨機訪問的資料結構,以維護訊息的元資料。BTree是最通用的資料結構,可以在訊息系統中支援各種各樣的語義。BTree的操作時間複雜度是O(log N)。通常O(log N)被認為是固定時間的,但是在磁碟操作中卻不是。每個磁碟一次只能執行一個seek,所以並行度受到限制。因此即使少量的磁碟搜尋也會導致非常高的開銷。由於作業系統將快速的快取操作和非常慢的磁碟操作相結合,所以觀察到樹結構的操作通常是超線性的,因為資料隨固定快取增加。

直觀的,持久化佇列可以像日誌的解決方案一樣,簡單的讀取和追加資料到檔案的結尾。這個結構的優勢是所有的操作都是O(1)的,並且讀取是可以並行不會阻塞的。這具有明顯的效能優勢,因為效能與資料大小完全分離,可以使用低速的TB級SATA驅動器。雖然這些驅動器的搜尋效能不佳,但是對於大量讀寫而言,他們的效能是可以接受的,並且價格是三分之一容量是原來的三倍。

無需任何的效能代價就可以訪問幾乎無限的磁碟空間,這意味著我們可以提供一些在訊息系統中非尋常的功能。例如,在Kafka中,我們可以將訊息保留較長的時間(如一週),而不是在消費後就儘快刪除。這位消費者帶來了很大的靈活性。

3. Efficiency

我們在效率上付出了很大的努力。主要的用例是處理web的資料,這個資料量非常大:每個頁面可能會生成十幾個寫入。此外我們假設每個釋出的訊息至少被一個Consumer消費,因此我們儘可能使消費的開銷小一些。

從構建和執行一些類似的系統的經驗發現,效率是多租戶操作的關鍵。如果下游基礎服務成為瓶頸,那麼應用程式的抖動將會引起問題。我們確保應用程式不會引起基礎服務的Load問題,這個非常重要的,當一個叢集服務上百個應用程式的時候,因為應用的使用模式的變化時非常頻繁的。

我們在之前的章節中討論過磁碟的效率。一旦不良的磁碟訪問模式被消除,這種型別的系統有兩個低效的原因:太多太小的IO操作和過多的資料拷貝。

太小的IO操作問題存在於客戶端和服務端之間,也存在於服務端自身的持久化當中。

為了避免這個問題,我們的協議圍繞“message set”抽象,通常是將訊息聚合到一起。這允許網路請求將訊息聚合到一起,並分攤網路往返的開銷,而不是一次傳送單個訊息。服務端依次將大塊訊息追加到日誌中,消費者一次線性獲取一批資料。

這種簡單的優化產生了一個數量級的加速。分批帶來了更大的網路包,連續的磁碟操作,連續的記憶體塊等等,這些都使得Kafka將隨機訊息寫入轉化為線性的寫入並流向Consumer。

其他低效的地方是字元複製。在訊息少時不是問題,但是對負載的影響是顯而易見的。為了避免這種情況,我們採用被producer、broker、Consumer共享的標準的二進位制訊息格式(所以資料可以在傳輸時不需要進行修改)。

由Broker維護的訊息日誌本身只是一批檔案,每個檔案由一系列以相同格式寫入的訊息構成。保持相同的格式保證了最重要的優化:網路傳輸和持久化日誌塊。現在UNIX作業系統提供了高度優化的程式碼路徑用於將pagecache的資料傳輸到網路;在Linux中,這有sendfile實現。

要劉姐sendfile的影響,瞭解從檔案到網路傳輸資料的data path非常重要:

  1. 作業系統從磁碟讀取檔案資料到pagecache,在核心空間
  2. 使用者從核心空間將資料讀到使用者空間的buffer
  3. 作業系統重新將使用者buffer資料讀取到核心空間寫入到socket中
  4. 作業系統拷貝socket buffer資料到NIC buffer併發送到網路

這顯然是低效的,有四個副本和兩個系統呼叫。使用sendfile,允許作業系統直接將資料從pagecache寫入到網路,而避免不必要的拷貝。在這個過程中,只有最終將資料拷貝到NIC buffer是必要的。

我們期望一個共同的場景是多個Consumer消費一個Topic資料,使用zero-copy優化,資料被拷貝到pagecache並且被多次使用,而不是每次讀取的時候拷貝到記憶體。這允許以接近網路連線的速度消費訊息。

pagecache和sendfile的組合意味著在消費者追上寫入的情況下,將看不到磁碟上的任何讀取活動,因為他們都將從快取讀取資料。

sendfile和更多的zero-copy背景知識見zero-copy

End-to-end Batch Compression

在一些場景下,CPU核磁盤並不是效能瓶頸,而是網路頻寬。在資料中心和廣域網上傳輸資料尤其如此。當然,使用者可以壓縮它的訊息而不需要Kafka的支援,但是這可能導致非常差的壓縮比,因為冗餘的大部分是由於相同型別的訊息之間的重複(例如JSON的欄位名)。多個訊息進行壓縮比單獨壓縮每條訊息效率更高。

Kafka通過允許遞迴訊息來支援這一點。一批訊息可以一起壓縮並以此方式傳送到服務端。這批訊息將以壓縮的形式被寫入日誌,只能在消費端解壓縮。

Kafka支援GZIP,Snappy和LZ4壓縮協議。更多的壓縮相關的細節在這裡

4. The Producer

Load balancing

Producer直接向Leader Partition所在的Broker傳送資料而不需要經過任何路由的干預。為了支援Producer直接向Leader Partition寫資料,所有的Kafka服務節點都支援Topic Metadata的請求,返回哪些Server節點存活的、Partition的Leader節點的分佈情況。

由客戶端控制將資料寫到哪個Partition。這可以通過隨機或者一些負載均衡的策略來實現(即客戶端去實現Partition的選擇策略)。Kafka暴露了一個介面用於使用者去指定一個Key,通過Key hash到一個具體的Partition。例如,如果Key是User id,那麼同一個User的資料將被髮送到同一個分割槽。這樣就允許消費者在消費時能夠對消費的資料做一些特定的處理。這樣的設計被用於處理“區域性敏感”的資料(結合上面的場景,Partition內的資料是可以保持順序消費的,那麼同一個使用者的資料在一個分割槽,那麼就可以保證對任何一個使用者的處理都是順序的)。

Asynchronous send

批處理是提升效率的主要方式一致,為了支援批處理,Kafka允許Producer在記憶體聚合資料並在一個請求中發出。批處理的大小可以是通過訊息數量指定的,也可以是通過等待的時間決定的(例如64K或者10ms)。這樣允許聚合更多的資料後傳送,減少了IO操作。緩衝的資料大小是可以配置了,這樣能適當增加延遲來提升吞吐。

更多的細節可以在Producer的配合和API文件中找到。

5 The Consumer

Kafka Consumer通過給Leader Partition所在的Broker傳送“fetch”請求來進行消費。Consumer在請求中指定Offset,並獲取從指定的Offset開始的一段資料。因此Consumer對消費的位置有絕對的控制權,通過重新設定Offset就可以重新消費資料。

Push vs Pull

我們考慮的一個初步問題是Consumer應該從Broker拉取資料還是Broker將資料推送給Consumer。在這方面,Kafka和大多數訊息系統一樣,採用傳統的設計方式,由Producer想Broker推送資料,Consumer從Broker上拉取資料。一些日誌中心繫統,如Scribe和Apache Flume,遵循資料向下遊推送的方式。兩種方式各有利弊。基於推送的方式,由於是由Broker控制速率,不能很好對不同的Consumer做處理。Consumer的目標通常是以最大的速率消費訊息,不幸的是,在一個基於推送的系統中,當Consumer消費速度跟不上生產速度 時,推送的方式將使Consumer“過載”。基於拉取的系統在這方面做的更好,Consumer只是消費落後並在允許時可以追上進度。消費者通過某種協議來緩解這種情況,消費者可以通過這種方式來表明它的負載,這讓消費者獲得充分的利用但不會“過載”。以上原因最終使我們使用更為傳統的Pull的方式。

Pull模型的另一個優勢是可以聚合資料批量傳送給Consumer。Push模型必須考慮是立即推送資料給Consumer還是等待聚合一批資料之後傳送。如果調整為低延遲,這將導致每次只發送一條訊息(增加了網路互動)。基於Pull的模式,Consumer每次都會盡可能多的獲取訊息(受限於可消費的訊息數和配置的每一批資料最大的訊息數),所以可以優化批處理而不增加不必要的延遲。

基於Pull模式的一個缺陷是如果Broker沒有資料,Consumer可能需要busy-waiting的輪訓方式來保證高效的資料獲取(在資料到達後快速的響應)。為了避免這種情況,我們在Pull請求中可以通過引數配置“long poll”的等待時間,可以在服務端等待資料的到達(可選的等待資料量的大小以保證每次傳輸的資料量,減少網路互動)。

你可以想象其他一些從端到端,採用Pull的可能的設計。Producer把資料寫到本地日誌,Broker拉取這些Consumer需要的資料。一個相似的被稱為“store-and-forward”的Producer經常被提及。這是有趣的,但是我們覺得不太適合我們可能會有成千上萬個Producer的目標場景。我們維護持久化資料系統的經驗告訴我們,在系統中使多應用涉及到上千塊磁碟將會使事情變得不可靠並且會使操作它們變成噩夢。最後再實踐中,我們發現可以大規模的執行強大的SLAs通道,而不需要生產者持久化。

Consumer Position

記錄哪些訊息被消費過是訊息系統的關鍵效能點。

大多數訊息系統在Broker上儲存哪些訊息已經被消費的元資料。也就是說,Broker可以在消費傳遞給Consumer後立即記錄或等待消費者確認之後記錄。這是一個直觀的選擇,並且對於單個伺服器而言並沒有更好的方式可以儲存這個狀態。大多數訊息系統中的儲存裝置並不能很好的伸縮,所以這也是務實的選擇——當Broker確認訊息被消費後就立即刪除,以保證儲存較少的資料。

讓Broker和Consumer關於那些訊息已經被消費了達成一致並不是一個簡單的問題。如果Broker在將訊息寫到網路之後就立即認為訊息已經被消費,那麼如果Consumer消費失敗(Consumer在消費訊息之前Crash或者網路問題等)訊息將丟失。為了解決這個問題,一些訊息系統增加了ACK機制,訊息被標記為只是傳送出去而不是已經被消費,Broker需要等待Consumer傳送的ACK請求之後標記具體哪些訊息已經被消費了。這個策略修復了訊息丟失的問題,但是引起了新的問題。第一,如果Consumer處理了訊息,但是在傳送ACK給Broker之前出現問題,那麼訊息會被重複訊息。第二,Broker需要維護每一條訊息的多個狀態(是否被髮送、是否被消費)。棘手的問題是要處理被髮送出去但是沒有被ACK的訊息。

Kafka採用不同的方式處理。Topic被劃分為多個內部有序的分割槽,每個分割槽任何時刻只會被一個group內的一個Consumer消費。這意味著一個Partition的Position資訊只是一個數字,標識下一條要消費的訊息的偏移量。這使得哪些訊息已經被消費的狀態變成了一個簡單的資料。這個位置可以定期做CheckPoint。這使得訊息的ACK的代價非常小。

這個方案還有其他的好處。消費者可以優雅的指定一箇舊的偏移量並重新消費這些資料。這和通常的訊息系統的觀念相違背,但對很多消費者來說是一個很重要的特性。比如,如果Consumer程式存在BUG,在發現並修復後,可以通過重新消費來保證資料都正確的處理。

Offline Data Load

可擴充套件的持久化儲存的能力,是消費者可以定期的將資料匯入到像Hadoop這樣的離線系統或關係型資料倉庫中。

在Hadoop的場景中,我們通過把資料分發到獨立的任務中進行並行處理,按照node/topic/partition組合,充分使用另行能力載入資料。Hadoop提供任務管理,失敗的任務可以重新啟動,而不需要擔心重複資料的危險——任務會從原始位置重新啟動。

6. Message Delivery Semantics

現在我們對Producer和Consumer已經有了一定的瞭解,接著我們來討論Kafka在Producer和Consumer上提供的語義。顯然的,在分發訊息時是可以有多種語義的:

  • At most once:訊息可能丟失,但不會重複投遞
  • At least once:訊息不會丟失,但可能會重複投遞
  • Exactly once:訊息不丟失、不重複,會且只會被分發一次(真正想要的)

值得注意的是這分為兩個問題:釋出訊息的可用性和消費訊息的可用性。

許多系統都聲稱提供“exactly once”語義,仔細閱讀會發現,這些宣告是誤導的(他們沒有考慮Producer和Consumer可能Crash的場景,或是資料寫入磁碟後丟失的情況)。

Kafka提供的語義是直接了當的。傳送訊息的時候我們有一個訊息被Commit到Log的概念。一旦訊息已經被Commit,它將不會丟失,只要還有一個複製了訊息所在Partition的Broker存活著。“存活”的定義以及我們覆蓋的失敗的情況將在下一節描述。現在假設一個完美的Broker,並且不會丟失,來理解對Producer和Consumer提供的語義保證。如果Producer傳送一條訊息,並且發生了網路錯誤,我們是不能確認錯誤發生在訊息Commit之前還是訊息Commit之後的。類似於使用自增主鍵插入資料庫,是不能確認寫入之後的主鍵值的。

Producer沒有使用的強制可能的語義。我們無法確認網路是否會發生異常,可以使Producer建立有序的主鍵使重試傳送成為冪等的行為。這個特性對一個複製系統來說不是無價值的,因為伺服器在發生故障的情況下依舊需要提供服務。使用這個功能,Producer可以重試,直到收到訊息成功commit的響應,在這個點上保證訊息傳送的exactly once。我們希望把這個特性加到後續的Kafka版本中。

不是所有的場景都需要這樣的保證。對應延遲敏感的場景,我們允許Producer指定其期望的可用性級別。如果Producer期望等待訊息Commit,那麼這可能消耗10ms。Producer也可以指定以非同步的方式傳送訊息或只等Leader節點寫入訊息(不能Follower)。

接著我們從消費者的視角來描述語義。所有的副本都擁有偏移量相同的日誌。Consumer控制它在日誌中的偏移量。如果Consumer一直正常執行,它可以只把偏移量儲存在記憶體中,但是如果Consumer crash且我們期望另一個新的Consumer接管消費,那麼需要選擇一個位置來開始消費。假設Consumer讀取了一些訊息——它有集中處理訊息和位置的方式。

它可以讀取訊息,然後儲存位置資訊,然後處理訊息。在這個場景中,Consumer可能在儲存位置資訊後消費訊息失敗,那麼下一次消費可能從儲存的位點開始,儘管之前部分訊息被處理失敗。這是消費處理過程中失敗的at-most-once(只被處理了一次,但是可能處理失敗)。

它可以讀取訊息,之後處理訊息,最後儲存位置資訊。這個場景中,Consumer可能在處理完訊息,但是儲存位點之前Crash,那麼下一次會重新消費這些訊息,儘管已經被消費過。這是Consumer Crash引起的at-least-once(訊息可能會被處理多次)。

在很多場景衝,訊息可以有一個逐漸,這樣可以保證處理的冪等性(多次處理不會有影響)。

那麼什麼是exactly once語義?這裡的限制實際上不是訊息系統的特性,而是訊息處理和位置資訊的儲存。經典的解決方案是採用兩階段提交的方式來處理。但是這也可以用一個更簡單的方式來處理:通過將訊息處理結果和位置資訊儲存在同一位置上。這是更好的,因為很多Consumer期望寫入的系統並不支援兩階段提交。例如, 我們的hadoop ETL工具從儲存資料到dhfs上的同時也把位移位置也儲存到hdfs中了, 這樣可以保證資料和位移位置同時被更新或者都沒更新.我們在很多系統上使用類似的模式, 用於解決那些需要這種強語義但是卻沒有主鍵用於區分重複的儲存系統中.

預設Kafka提供at-least-once語義的訊息分發,允許使用者通過在處理訊息之前儲存位置資訊的方式來提供at-most-once語義。exactly-once語義需要和輸出系統像結合,Kafka提供的offset可以使這個實現變的“直接了當的”(變得比較簡單)。

7. Replication

Kafka為Topic的每個Partition日誌進行備份,備份數量可以由使用者進行配置。這保證了系統的自動容錯,如果有伺服器宕機,訊息可以從剩餘的伺服器中讀取。

其他訊息系統提供了備份相關的功能,但在我們看來,這是一個附加的功能,不能被大量使用,並且伴隨著大量的缺點:Slave是不活躍的(這裡應該是指Slave只提供了備份,並不可以被消費等等)、吞吐受到很大的影響、需要手動配置等等。在Kafka中,我們預設就提供備份,實際上我們認為沒有備份的Topic是一種特殊的備份,只是備份數為1。

備份的單位是Topic的分割槽。在沒有發生異常的情況下,Kafka中每個分割槽都會有一個Leader和0或多個Follower。備份包含Leader在內(也就是說如果備份數為3,那麼有一個Leader Partition和兩個Follower Partition)。所有的讀寫請求都落在Leader Partition上。通常情況下分割槽要比Broker多,Leader分割槽分佈在Broker上。Follower上的日誌和Leader上的日誌相同,擁有相同的偏移量和訊息順序(當然,在特定時間內,Leader上日誌會有一部分資料還沒複製到Follower上)。

Follower作為普通的Consumer消費Leader上的日誌,並應用到自己的日誌中。Leader允許Follower自然的,成批的從服務端獲取日誌並應用到自己的日誌中。

大部分分散式系統都需要自動處理故障,需要對節點“alive”進行精確的定義。例如在Kafka中,節點存活需要滿足兩個條件:

  1. 節點需要保持它和ZooKeeper之間的Session(通過ZK的心跳機制)
  2. 如果是Follower,需要複製Leader上的寫事件,並且複製進度沒有“落後太多”

我們稱滿足這兩個條件的節點為“同步的”來避免使用“alive”或“failed”這樣模糊的概念。Leader節點儲存同步中的Follower節點。如果一個Follower宕機或複製落後太多,Leader將從同步的Follower List中將其移除。通過replica.lag.time.max.ms配置來定義“落後太多”。

在分散式系統的術語中,我們只嘗試處理“失敗/恢復”模型——節點突然停止工作之後恢復的情況。Kafka不處理“拜占庭”問題。

一條訊息在被應用到所有的備份上之後被認為是“已經提交的”。只有提交了的訊息會被Consumer消費。這意味著Consumer不需要擔心Leader節點宕機後訊息會丟失。另一方面,Producer可以配置是否等待訊息被提交,這取決於他們在延遲和可用性上的權衡。這個可以通過Producer的配置項中設定。

Kafka提供了一條訊息被提交之後,只要還有一個備份可用,訊息就不會丟失的保證。

Kafka保證在節點故障後依舊可用,但是無法再網路分割槽的情況下保持可用。

Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)

Kafka分割槽機制的核心是日誌複製。日誌複製是分散式系統中最基礎的東西,有很多方式可以實現。日誌複製可以作為基於狀態機的分散式系統的基礎設定。

日誌複製模型用於處理連續、有序的輸入(例如給log entry新增0、1、2這樣的編號)。有很多方式實現日誌複製,最簡單的方式是Leader選擇和提供這個順序之。只要Leader節點存活,Follower只需要按照Leader選擇的值和順序來複制即可。

當然,如果Leader不會宕機,那我們也不需要Follower了!在Leader宕機之後,我們需要在Follower中選擇一個節點成為新的Leader。Follower可能會宕機或者日誌落後較多,所以我們必須確保選擇一個“及時同步”(複製進度和Leader最近的節點)成為新的Leader。複製演算法必須提供這樣的保證:如果Client收到一條訊息已經被Commit了,如果Leader宕機,新Leader必須包含這條已經被Commit的訊息。這是一個權衡:Leader在確認訊息Commit之前需要等待更多的Follower來確認複製了訊息來保證在Leader宕機後有更多可以成為Leader的Follower節點。

如果你選擇了所需要的ACK的數量以及選擇Leader時需要比較的日誌數以確保能重合,這個叫做Quorum。

一個通用的來權衡的方式是提交日誌和選擇Leader時都採用大多數投票的原則。這不是Kafka使用的方式,但是無所謂,讓我們去理解這種方式來了解實現原理。假設一共有2f+1個備份,那麼f+1的副本必須在Leader提交commit之前接收到訊息,這樣就可以從f+1個節點中選擇出新節點作為Leader。因為任何f+1個節點,必然有一個節點包含最全的日誌。還有很多關於這個演算法的細節需要處理(如何定義日誌更全、在Leader節點宕機時保持日誌一致性等)在這裡先忽略。

大多數選票的方法有非常好的特性:延遲取決於同步最快的Server節點。這說明,如果備份數為3,那麼延遲取決於兩個備份節點中較快的節點。

有很多類似的演算法變體,例如ZooKeeper的Zab,Raft,Viewstamped Replication等。和Kafka最相似的學術刊物是微軟的PacificA。

大多數選票方式的取消是它不能容忍很多的故障,導致你沒有可以被選為新Leader的節點。為了容忍一個節點故障,需要3分資料備份,容忍兩個節點故障則需要5個節點。在我們的經驗中,只有足夠的冗餘才能容忍單一的故障在實際系統中是不夠的,每次寫5次副本,使用5倍的儲存空間,和1/5的頻寬,在大體量的資料儲存上不是很可行。這就是為什麼quorum演算法多多應用在像ZK這樣儲存配置的叢集中,而不是資料儲存系統中。例如HDFS的namenode的高可用建立在大多數選票的機制上,但是資料儲存缺不是。

Kafka使用一個明顯不同的方式來選擇quorum集合。代替大多數選票,Kafka動態的維護一個“同步的備份(in-sync replicas ISR)”的集合。只有這個集合中的成員能被選舉為Leader。一個寫入請求需要同步到所有的同步中的備份才能認為是提交的。ISR集合在變更時會被持久化到ZK。因此,任何ISR中的備份都可以被選舉為新的Leader。這對於Kafka這種擁有多分割槽並且需要保證這節點負載均衡的模型來說非常重要。使用ISR模型和f+1個副本,Kafka可以容忍f個備份不可用的情況。

對於大多數的場景,我們認為這樣的妥協是合理的。在實踐中,為了容忍f個節點故障,大多數選票原則和ISR方式都需要等待相同的備份在提交訊息前進行確認(如需要容忍一個節點故障,大多數選票的選擇需要3個節點,並且提交訊息需要至少一個備份的確認;ISR只需要兩個節點,需要確認的副本數一樣是一個)。相對於大多數選票的原則,ISR方式不需要等待最慢的伺服器確認訊息是一個優勢。儘管如此,我們進行改善,讓客戶端決定是否等待訊息提交,使用較小的副本數,這樣帶來的吞吐和更小的磁碟空間要求是有價值的。

另一個重要的設計是Kafka不需要故障的節點恢復所有的資料。這是不常見的,複製演算法依賴於儲存介質在任何故障的情況下都不丟失資料並且不違反一致性原則。這個假設有兩個主要的問題。第一,磁碟故障是持久化資料系統中最常見的問題,並且它通常導致資料不完整。第二,即使這不是一個問題,我們也不希望在每一次寫入之後都使用fsync來保證一致性,這會使效能下降兩三個數量級。我們的協議中允許一個副本重新加入到ISR集合中,在重新加入之前,它需要從新同步在故障時丟失的資料。

Unclean leader election: What if they all die?

Kafka保證的資料不丟失,在至少有一個備份保持同步的情況下。如果一個分割槽所有的備份的節點都故障,那麼就不能提供這個保障了。

但是實踐系統中需要一些合理的事情,在所有備份故障時。如果不巧遇上這個問題,去考慮哪些情況會發生是非常重要的。有兩種方式去做:

  1. 等待一個ISR中的副本恢復並將其選舉為新的Leader(期望它擁有所有的資料)。
  2. 選擇第一個副本(無需在ISR中)作為Leader。

這是在可用性和一致性之間的權衡。如果我們等待ISR中的備份恢復,那麼會在這個期間一直不可用。如果這樣的副本被損壞,那麼我們將永久性的失效。另一方便,如果使用不在ISR中的備份成為Leader,儘管它可能不包含所有的日誌。預設情況下,Kafka使用第二種策略,當所有ISR中的備份不可用時,傾向於選擇可能不一致的備份。這個方式可以通過unclean.leader.election.enable配置禁用,在哪些停機時間優於不一致的場景。

這種困境不是kafka特有的, 這存在於任何基於quorum方式的結構中. 例如, 多數投票演算法, 如果大多數的伺服器都永久性失效了, 你必須選擇丟失全部的資料或者接受某一臺可能資料不一致的伺服器上的資料.

Availability and Durability Guarantees

在向Kafka寫入資料時,Producer可以選擇是否等待0,1或(-1)個備份響應。注意,這裡說的“被所有備份響應”不是說被所有分配的備份響應,預設情況下只的時所有ISR集合中的備份響應。例如,如果一個Topic配置成只需要兩個備份,並且一個備份故障了,那麼寫入一個備份即認為收到了所有響應。但是,如果這個備份也故障了,那麼資料會丟失。這樣保證了分割槽的最大可用,但是可能不是那些相對於可用性更需要可靠性的使用者的需求。因此,我們提供兩種Topic級別的配置,相對於可用性,優先保證可靠性:

  1. 禁用unclean leader election;如果所有備份不可用,那麼分割槽保持不可用,直到最近的Leader重新恢復可用。這可能導致不可用,但是不會丟失資料。
  2. 配置一個最小的ISR大小;分割槽只會在滿足最小ISR的情況下接受請求,這樣可以避免資料只寫入一個備份,而這個備份後續故障導致資料丟失。這個配置只在Producer使用acks=all的配置時有效。這個配置在一致性和可用性上做了權衡。更大的ISR提供了更好的一致性,但是降低了可用性,如果同步備份數小於最小ISR配置時將不可用。

Replica Management

以上的討論都是基於一個日誌,即一個Topic的分割槽考慮的。但是Kafka叢集擁有成百上千這樣的分割槽。我們嘗試使用輪訓的方式來平衡分割槽,避免高數量的Topic的分割槽集中在一部分少量的節點上。同樣我們要平衡所有Leader分割槽,這樣每個節點上承載的主分割槽都有一定的比例。

優化Leader的選舉過程也是非常重要的,因為這是系統不可用的視窗期。一個直觀的實現是,如果一個節點故障了,為這個節點上所有的分割槽都獨立的執行一次選舉。代替這種方式,我們選擇一個Broker作為Controller,Controller負責一個故障節點影響的所有分割槽的Leader變更。這樣的好處是我們可以批量處理,減少獨立選舉時大量的通知,這使得大量分割槽需要選舉時變得更快,代價更小。如果Controller故障了,剩餘的Broker中會有一個節點成為新的Controller。

8 Log Compaction

日誌壓縮確保Kafka會為一個Topic分割槽資料日誌中保留至少message key的最後一個值。它解決了應用crash或系統故障或應用在操作期間重啟來重新載入快取的場景。讓我們深入到細節中解釋日誌壓縮是如何工作的。

到屋面位置,我們只說明瞭在一斷時間或達到特定大小的時候丟棄就日誌的簡單方法。這適用於想日誌這樣每一條資料都是獨立資料的情況。但是重要類別的資料是根據key處理的資料(例如DB中表的變更資料)。

讓我們來討論這樣一個具體的流的例子。一個Topic包含了使用者email address資訊;每一次使用者變更郵箱地址,我們都像這個topic傳送一條訊息,使用使用者ID作為primay key。現在我們已經為使用者ID為123的使用者傳送了一些訊息,每條訊息包含了email address的變更:

123 => [email protected]

123 => [email protected]

123 => [email protected]

日誌壓縮為我們提供了更精細的保留機制,至少儲存每個key最後一個變更(如123 => [email protected])。這樣做我們確保了這個日誌包含了所有key最後一個值的快照。這樣Consumer可以重建狀態而不需要保留完成的變更日誌。

讓我們列一些日誌壓縮有用的場景,然後看他是如果被使用的。

  1. DB變更訂閱。這是很常見的,一個數據在多個數據系統中,而且其中一個系統是資料庫型別的(如RDBMS或KV系統)。例如可能有一個數據庫,一個戶快取系統,一個搜尋叢集,一個Hadoop叢集。DB的任何一個變更需要反映到快取、搜尋叢集,最終儲存到Hadoop中。在這個場景中,你只需要實時系統最新的更新日誌。但是如果需要重新載入快取或恢復宕機的檢索節點,就需要完整的資料。
  2. 事件源。這是一種應用設計風格,它將查詢處理和應用程式設計結合到一起,並使用日誌作為程式的主要儲存。
  3. 高可用日誌。一個本地整合程式可以通過變更日誌來做到容錯,這樣另一個程式能夠在當前程式故障時繼續處理。例如, 像流資料查詢例子, 如計數, 彙總或其他的分組操作. 實時系統框架如Samza, 就是為了達到這個目的使用這個特性的。

在這些場景中,主要處理實時的變更,但有時需要重新載入或重新處理時,需要載入所有資料。日誌壓縮允許使用相同的Topic來支援這些場景,這種日誌使用風格在後續的內容中會更詳細的描述。

想法很簡單,我們有有無限的日誌,以上每種情況記錄變更日誌,我們從一開始就捕獲每一次變更。使用這個完整的日誌,我們可以通過回放日誌來恢復到任何一個時間點的狀態。這種假設的情況下,完整的日誌是不實際的,對於那些每一行記錄會變更多次的系統,即使資料集很小,日誌也會無限的增長下去。丟棄舊日誌的簡單操作可以限制空間的增長,但是無法重建狀態——因為舊的日誌被丟棄,可能一部分記錄的狀態會無法重建(這寫記錄所有的狀態變更都在就日誌中)。

日誌壓縮機制是更細粒度的,每個記錄都保留的機制,而不是基於時間的粗粒度。這個想法是選擇性的刪除哪些有更新的變更的記錄的日誌。這樣最終日誌至少包含每個key的記錄的最後一個狀態。

這個策略可以為每個Topic設定,這樣一個叢集中,可以一部分Topic通過時間和大小保留日誌,另外一些可以通過壓縮保留。

這個功能的靈感來自於LinkedIn的最古老且最成功的基礎設定——一個稱為Databus的資料庫變更日誌快取系統。不想大多數的日誌儲存系統,Kafka為了訂閱而量身打造,用於線性的快速讀寫。和Databus不同,Kafka作為真實的儲存,壓縮日誌是非常有用的,在上有資料來源不能重放的情況下。

Log Compaction Basics

這裡是一個展示Kafka日誌的邏輯結構的圖(每條訊息包含了一個offset):

log_cleaner_anatomy

Log head中包含傳統的Kafka日誌。它包含了連續的連續的offset和所有的訊息。日誌壓縮增加了處理tail Log的選項。上圖展示了日誌壓縮的的Log tail的情況。tail中的訊息儲存了初次寫入時的offset。即使該offset的訊息被壓縮,所有offset仍然在日誌中是有效的。在這個場景中,無法區分和下一個出現的更高offset的位置。如上面的例子中,36、37、38是屬於相同位置的,從他們開始讀取日誌都將從38開始。

壓縮允許刪除。一條訊息伴隨著空的值被認為從日誌中刪除。這個刪除標記將會引起所有之前擁有相同key的訊息被移除(包括擁有key相同的新訊息),但是刪除標記比較特殊,它將在一定週期後被從日誌中刪除來示範空間。這個時間點被稱為“delete retention point”。

壓縮操作通過在後臺週期性的拷貝日誌段來完成。清除操作不會阻塞讀取,並且可以被配置不超過一定IO吞吐來避免影響Producer和Consumer。實際的日誌段壓縮過程有點像如下:

log_cleaner_anatomy log_compaction

What guarantees does log compaction provide?

日誌壓縮提供瞭如下的保證:

  1. 所有跟上消費的Consumer能消費到所有寫入的訊息;這些訊息有連續的序列號。Topic的min.compaction.lag.ms可以用於保證訊息寫入多久後才會被壓縮。這限制了一條訊息在Log Head中的最短存在時間。
  2. 訊息的順序會被保留。壓縮不會重排序訊息,只是移除其中一部分。
  3. 訊息的Offset不會變更。這是訊息在日誌中的永久標誌。
  4. 任何從頭開始處理日誌的Consumer至少會拿到每個key的最終狀態。另外,只要Consumer在小於Topic的delete.retention.ms設定(預設24小時)的時間段內到達Log head,將會看到所有刪除記錄的所有刪除標記。換句話說,因為移除刪除標記和讀取是同事發生的,Consumer可能會因為落後超過delete.retention.ms而導致錯過刪除標記。

Log Compaction Details

日誌壓縮由Log Cleaner執行,後臺執行緒池重新拷貝日誌段,移除那些key存在於Log Head中的記錄。每個壓縮執行緒如下工作:

  1. 選擇Log Head相對於Log Head在日誌中佔更高比例的日誌
  2. 建立Log Head中每個Key最後一個offset的摘要
  3. 從頭到尾的拷貝日誌,並刪除之後日誌終於到相同key的記錄。新的、乾淨的日誌將會立即被交到到日誌中,所以只需要一個額外的日誌段空間
  4. Log Head的摘要實際上是一個空間緊湊的雜湊表。每個條目使用24個位元組。所以如果有8G的整理緩衝區, 則能迭代處理大約366G的日誌頭部(假設訊息大小為1k)。

Configuring The Log Cleaner

Log Cleaner預設啟用。這會啟動清理的執行緒池。如果要開始特定Topic的清理功能,可以開啟特定的屬性:

log.cleanup.policy=compact

這個可以通過建立Topic時配置或者之後使用Topic命令實現。

Log Cleaner可以配置保留最小的不壓縮的日誌頭。可以通過配置壓縮的延遲時間:

log.cleaner.min.compaction.lag.ms

這可以用於保證訊息比在被壓縮的訊息大一段時間。如果沒有設定,除了最後一個日誌外,所有的日誌都會被壓縮。當前寫入的自如端不會被壓縮,即使所有的訊息都落後於比配置的最小壓縮時間。

更多的配置在這裡

9 Quotas

從0.9版本開始,Kafka可以對生產和消費請求進行限額配置。基於位元組速率來限制,每個group中所有的客戶端共享一個限額。

Why are quotas necessary?

Producer和Consumer可能生產或消費大量的資料而耗盡Broker的資源,導致網路飽和。進行限額可以避免這些問題,特別是在多租戶的叢集中,一小部分低質量的客戶端會降低整個叢集的體驗。實際上,當執行Kafka作為服務時,這還可以對API的使用進行限制。

Client groups

Kafka客戶端的身份代表了用於鑑權。 在無鑑權機制的叢集中, 使用者身份是由伺服器使用可配置的PrincipalBuilder進行選擇的, Client-id作為客戶端邏輯分組, 是由客戶端應用選擇的一個有意義的名稱. 標量(user, client-id)定義共享這個使用者身份和客戶端ID的邏輯客戶端分組.

配額可以用於(user, client-id)組合, 或user, client-id分組。

對一個給定的連線, 最符合這個連線的配額被使用到, 一個限額組的所有連線共享這個限額配置, 例如: 如果(user=”test-user”, client-id=”test-client”) 10MB/s的配額, 這個配置會被所有的具有”test-user”使用者和客戶端ID是 “test-client”的所有生產者所共享.

Quota Configuration

配額可以按照(user, client-id)或者, user或client-id進行分組, 如果需要更高或更低的配額, 可以覆蓋默配額, 這個機制類似於對日誌主題配置的覆蓋, user 或者 (user, client-id)配額可以覆蓋寫入到zookeeper下的 /config/users ,client-id配置, 可以寫入到 /config/clients。這些覆蓋寫入會被伺服器很快的讀取到, 這讓我們修改配置不需要重新啟動伺服器. 每個分組的預設配置也可以同樣的方式動態修改。

限額的配置順序如下:

  1. /config/users//clients/
  2. /config/users//clients/
  3. /config/users/
  4. /config/users//clients/
  5. /config/users//clients/
  6. /config/users/
  7. /config/clients/
  8. /config/clients/

Broker的quota.producer.default,quota.consumer.default也可以用來配置預設的client-id分組的預設值。這可屬性已經不鼓勵使用,後續將會刪除。預設client-id限額配置可以和其它預設配置一樣, 在Zookeeper直接設定。

Enforcement

預設情況下,每個唯一的客戶端group會收到一個叢集配置的固定的限額。這個限額是基於每個Broker的。每個客戶端能釋出或獲取在每臺伺服器都的最大速率, 我們按伺服器定義配置, 而不是按整個叢集定義,是因為如果是叢集範圍的需要額外的機制來共享配額的使用情況, 這會導致配額機制的實現比較難。

Broker檢測到限額違規時時如何處理的?在我們的解決方案中,Broker不會返回錯誤給客戶端,而是降低客戶端的速率。Broker計算使客戶端回到合理限額的需要的響應延遲。這種方法的處理對客戶端是透明,使他們不必執行任何棘手的,特殊的操作。實際上,錯誤的客戶端還可能加劇正在解決的限額問題。

客戶端位元組率在多個小視窗(例如每個1秒的30個視窗)上進行測量,以便快速檢測和糾正配額違規。 通常,具有大的測量視窗(例如,每個30秒的10個視窗)導致大量的流量脈衝,隨後是長時間的延遲,這在使用者體驗方面不是很好。


丞一

丞一

中介軟體技術專家 at 螞蟻金服丞一,目前就職於螞蟻金服,熱衷於研究分散式系統相關的技術;微信公眾號:MessageQueue,歡迎交流;