1. 程式人生 > >消息服務百科全書——Kafka基本原理介紹

消息服務百科全書——Kafka基本原理介紹

feature col 都是 指定 ESS 生成 接下來 另一個 追加

架構
1.1 總體架構
技術分享圖片
因為Kafka內在就是分布式的,一個Kafka集群通常包括多個代理。
為了均衡負載,將話題分成多個分區,每個代理存儲一或多個分區。多個生產者和消費者能夠同時生產和獲取消息。
技術分享圖片
一個典型的Kafka請添加鏈接描述集群中包含若幹Producer(可以是web前端產生的Page View,或者是服務器日誌,系統CPU、Memory等),若幹broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若幹Consumer Group,以及一個Zookeeper集群。
Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱並消費消息

1.1.1 代理
與其它消息系統不同,Kafka代理是無狀態的。這意味著消費者必須維護已消費的狀態信息。這些信息由消費者自己維護,代理完全不管。這種設計非常微妙,它本身包含了創新。
從代理刪除消息變得很棘手,因為代理並不知道消費者是否已經使用了該消息。Kafka創新性地解決了這個問題,它將一個簡單的基於時間的SLA應用於保留策略。當消息在代理中超過一定時間後,將會被自動刪除。
這種創新設計有很大的好處,消費者可以故意倒回到老的偏移量再次消費數據。這違反了隊列的常見約定,但被證明是許多消費者的基本特征。

1.1.2 Topic & Partition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue裏。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。若創建topic1和topic2兩個topic,且分別有13個和19個分區,則整個集群上會相應會生成共32個文件夾(本文所用集群共8個節點,此處topic1和topic2 replication-factor均為1),如下圖所示。

技術分享圖片
每個日誌文件都是一個log entrie序列,每個log entrie包含一個4字節整型數值(值為N+5),1個字節的"magic value",4個字節的CRC校驗碼,其後跟N個字節的消息體。每條消息都有一個當前Partition下唯一的64字節的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:
技術分享圖片
這個log entries並非由一個文件構成,而是分成多個segment,每個segment以該segment第一條消息的offset命名並以“.kafka”為後綴。另外會有一個索引文件,它標明了每個segment下包含的log entry的offset範圍,如下圖所示。
技術分享圖片
因為每條消息都被append到該Partition中,屬於順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
技術分享圖片
對於傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有數據(實際上也沒必要),因此Kafka提供兩種策略刪除舊數據。一是基於時間,二是基於Partition文件大小。例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數據,也可在Partition文件超過1GB時刪除舊數據,配置如下所示。
技術分享圖片
這裏要註意,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這裏刪除過期文件與提高Kafka性能無關。選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會為每一個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條消息後遞增該offset。當然,Consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
Kafka的存儲布局非常簡單。話題的每個分區對應一個邏輯日誌。
物理上,一個日誌為相同大小的一組分段文件。每次生產者發布消息到一個分區,代理就將消息追加到最後一個段文件中。
當發布的消息數量達到設定值或者經過一定的時間後,段文件真正寫入磁盤中。寫入完成後,消息公開給消費者。
與傳統的消息系統不同,Kafka系統中存儲的消息沒有明確的消息Id。消息通過日誌中的邏輯偏移量來公開。這樣就避免了維護配套密集尋址,用於映射消息ID到實際消息地址的隨機存取索引結構的開銷。消息ID是增量的,但不連續。要計算下一消息的ID,可以在其邏輯偏移的基礎上加上當前消息的長度。
消費者始終從特定分區順序地獲取消息,如果消費者知道特定消息的偏移量,也就說明消費者已經消費了之前的所有消息。消費者向代理發出異步拉請求,準備字節緩沖區用於消費。每個異步拉請求都包含要消費的消息偏移量。Kafka利用sendfile API高效地從代理的日誌段文件中分發字節給消費者。
3.1.1.3 Producer消息路由
Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪一個Partition。
如果Partition機制設置合理,所有消息可以均勻分布到不同的Partition裏,這樣就實現了負載均衡。
如果一個Topic對應一個文件,那這個文件所在的機器I/O將會成為這個Topic的性能瓶頸,而有了Partition後,不同的消息可以並行寫入不同broker的不同Partition裏,極大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的默認Partition數量,也可在創建Topic時通過參數指定,同時也可以在Topic創建之後通過Kafka提供的工具修改。
在發送一條消息時,可以指定這條消息的key,Producer根據這個key和Partition機制來判斷應該將這條消息發送到哪個Parition。Paritition機制可以通過指定Producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中如果key可以被解析為整數則將對應的整數與Partition總數取余,該消息會被發送到該數對應的Partition。(每個Parition都會有個序號,序號從0開始)
技術分享圖片
如果將上例中的類作為partition.class,並通過如下代碼發送20條消息(key分別為0,1,2,3)至topic3(包含4個Partition)。
public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
List messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 4; j++){
messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
}
producer.send(messageList);
}
producer.close();
}

則key相同的消息會被發送並存儲到同一個partition裏,而且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是通過Java程序調用Consumer後打印出的消息列表。
技術分享圖片
1.1.4 Consumer Group
(本節所有描述都是基於Consumer hight level API而非low level API)。
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
技術分享圖片
這是Kafka用來實現一個Topic消息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group裏。用Consumer Group還可以將Consumer進行自由的分組而不需要多次發送消息到不同的Topic。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的Consumer屬於不同的Consumer Group即可。下圖是Kafka在Linkedin的一種簡化部署示意圖。
技術分享圖片
下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先創建一個Topic (名為topic1,包含3個Partition),然後創建一個屬於group1的Consumer實例,並創建三個屬於group2的Consumer實例,最後通過Producer向topic1發送key分別為1,2,3的消息。結果發現屬於group1的Consumer收到了所有的這三條消息,同時group2中的3個Consumer分別收到了key為1,2,3的消息。如下圖所示。

技術分享圖片
1.1.5 Push vs. Pull
作為一個消息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push消息並由Consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事實上,push模式和pull模式各有優劣。
push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據Consumer的消費能力以適當的速率消費消息。
對於Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

1.1.6 Kafka delivery guarantee
有這麽幾種可能的delivery guarantee:
1、At most once 消息可能會丟,但絕不會重復傳輸
2、At least one 消息絕不會丟,但可能會重復傳輸
3、Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。
當Producer向broker發送消息時,一旦這條消息被commit,因replication的存在,它就不會丟。但是如果Producer發送數據給broker後,遇到網絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經commit。雖然Kafka無法確定網絡故障期間發生了什
麽,但是Producer可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還並未實現,有希望在Kafka未來的版本中實現。(所以目前默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發送實現At most once)。
接下來討論的是消息從broker到Consumer的delivery guarantee語義。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息後,可以選擇commit,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之後的開始位置相同。當然可以將Consumer設置為autocommit,即Consumer一旦讀到數據立即自動commit。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際使用中應用程序並非在Consumer讀取完數據就結束了,而是要進行進一步處理,而數據處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
讀完消息先commit再處理消息。這種模式下,如果Consumer在commit後還沒來得及處理消息就crash了,下次重新開始工作後就無法讀到剛剛已提交而未處理的消息,這就對應於At most once
讀完消息先處理再commit。這種模式下,如果在處理完消息之後commit之前Consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應於At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認為是Exactly once。(筆者認為這種說法比較牽強,畢竟它不是Kafka本身提供的機制,主鍵本身也並不能完全保證操作的冪等性。而且實際上我們說delivery guarantee 語義是討論被處理多少次,而非處理結果怎樣,因為處理方式多種多樣,我們不應該把處理過程的特性——如是否冪等性,當成Kafka本身的Feature)
如果一定要做到Exactly once,就需要協調offset和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,Consumer拿到數據後可能把數據放到HDFS,如果把最新的offset和數據本身一起寫到HDFS,那就可以保證數據的輸出和offset的更新要麽都完成,要麽都不完成,間接實現Exactly once。(目前就high level API而言,offset是存於Zookeeper中的,無法存於HDFS,而low level API的offset是由自己去維護的,可以將之存於HDFS中)
總之,Kafka默認保證At least once,並且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。

消息服務百科全書——Kafka基本原理介紹