1. 程式人生 > >大資料求索(7): Kafka的重要原理和概念一

大資料求索(7): Kafka的重要原理和概念一

大資料求索(7): Kafka的重要原理和概念一

大資料最好的學習資料是官方文件。

Kafka官方文件地址:http://kafka.apache.org/

一、Kakfa簡介

Apache kafka 是一個分散式的基於push-subscribe的訊息系統,它具備快速、可擴充套件、可持久化的特點。它現在是Apache旗下的一個開源系統,作為hadoop生態系統的一部分,被各種商業公司廣泛應用。它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。

二、Kafka技術概覽

2.1 Kafka的特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
  • 可擴充套件性:kafka叢集支援熱擴充套件
  • 永續性、可靠性:訊息被持久化到本地磁碟,並且支援資料備份防止資料丟失
  • 容錯性:允許叢集中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高併發:支援數千個客戶端同時讀寫

2.2 Kafka的重要設計思想

  • Topic & Partition: Topic相當於傳統訊息系統MQ中的一個佇列queue

    ,producer端傳送的message必須指定是傳送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行load balance,均勻的分佈在這個topic下的不同的partition上( hash(message) % [broker數量] )。

    物理上儲存上,這個topic會分成一個或多個partition,每個partiton相當於是一個子queue。在物理結構上,每個partition對應一個物理的目錄(資料夾),資料夾命名是[topicname][partition][序號],一個topic可以有無數多的partition,根據業務需求和資料量來設定。在kafka配置檔案中可隨時更改num.partitions引數來配置更改topic的partition數量,也可以在建立Topic時通過引數指定parittion數量。Topic建立之後通過Kafka提供的工具也可以修改partiton數量。

    一般來說,(1)一個topic的partition數量大於等於Broker的數量,可以提高吞吐率。(2)同一個partition的replication儘量分散到不同的機器,高可用

    當行增加一個partition的時候,partition裡面的message不會重新進行分配,原來的partition裡面的message資料不會變,新加的這個partition剛開始是空的,隨後進入這個topic的message就會重新參與所有partition的load balance。

  • partition replication:每個partition可以在其他的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka叢集。存replica副本的方式是按照kafka broker的順序存。例如有5個kafka broker節點,某個topic有3個partition,每個partition存2個副本,那麼partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replication副本數目不能大於kafka broker節點的數目,否則報錯。這裡的replica數其實就是partition的副本總數,其中包括一個leader,其他的就是copy副本)。這樣如果某個broker宕機,其實整個kafka內資料依然是完整的。但是,replica副本數越高,系統雖然越穩定,但是回來帶資源和效能上的下降;replica副本少的話,也會造成系統丟資料的風險。

  • **Partition leader與follower:**partition也有leader和follower之分。leader是主partition,producer寫kafka的時候先寫partition leader,再由partition leader push給其他的partition follower。partition leader與follower的資訊受Zookeeper控制,一旦partition leader所在的broker節點宕機,zookeeper會從其他的broker的partition follower上選擇follower變為parition leader。

  • Topic分配partition和partition replica的演算法:

    (1)將Broker(size=n)和待分配的Partition排序。

    (2)將第i個Partition分配到第(i%n)個Broker上。

    (3)將第i個Partition的第j個Replica分配到第((i + j) % n)個Broker上

  • Kakfa Broker Leader的選舉:Kakfa Broker叢集受Zookeeper管理,所以Kafka的使用需要先安裝Zookeeper。所有的Kafka Broker節點一起去Zookeeper上註冊一個臨時節點。但是隻有一個Kafka Broker會註冊成功,其他的都會失敗。這個成功在Zookeeper上註冊臨時節點Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower(這個過程叫Controller在ZooKeeper註冊Watch)。註冊成功後 ,Controller會監聽其他的Kafka Broker的所有資訊,如果這個Controller宕機了,在zookeeper上面的那個臨時節點就會消失。此時,所有的kafka broker又會一起去zookeeper上註冊一個臨時節點。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態,並選取ISR列表中的一個replication作為partition leader(如果ISR列表中的replication全掛,選一個倖存的replication作為leader; 如果該partition的所有的replication都宕機了,則將新的leader設定為-1,等待恢復,等待ISR中的任一個replication“活”過來,並且選它作為leader;或選擇第一個“活”過來的replication(不一定是ISR中的)作為leader)

  • Consumergroup: 各個consumer(consumer 執行緒)可以組成一個組(Consumer group ),partition中的每個message只能被組中的一個consumer消費,如果一個message可以被多個consumer消費的話,那麼這些consumer必須在不同的組。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這裡的多個consumer的消費都必須是順序讀取partition裡面的message,新啟動的consumer預設從partition佇列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)併發處理message,這是因為多個consumer去消費一個Queue中的資料的時候,由於要保證不能多個執行緒拿同一條message,所以就需要行級別悲觀鎖(for update),這就導致了consume的效能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer執行緒去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴充套件,在加新的consumer thread中去消費。如果想多個不同的業務都需要消費這個topic的資料,起多個consumer group就好了,大家都是順序的讀取messageoffsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴充套件性,吞吐量極高。這也就形成了分散式消費的概念。

    當啟動一個consumer group去消費一個topic的時候,無論topic裡面有多個少個partition,無論我們consumer group裡面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition,即便這個consumer group下只有一個consumer thread,那麼這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。

  • 訊息狀態:在Kafka中,訊息的狀態被儲存在consumer中,broker不會關心哪個訊息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的訊息位置),這就意味著如果consumer處理不好的話,broker上的一個訊息可能會被消費多次。

  • 訊息持久化:Kafka中會把訊息持久化到本地檔案系統中,並且保持極高的效率。

  • 訊息有效期:Kafka會長久保留其中的訊息,以便consumer可以多次消費,當然其中很多細節是可配置的。

  • 批量傳送:Kafka支援以訊息集合為單位進行批量傳送,以提高push效率。

  • Consumer: Consumer處理partition裡面的message的時候是O(1)順序讀取的,所以必須維護著上一次讀到哪裡的offsite資訊。high level API的offset存於Zookeeper中,low level API的offset由自己維護。一般來說都是使用high level api。Consumer的delivery gurarantee,預設是讀完message先commmit再處理message,autocommit預設是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也可以配置成讀完訊息處理再commit,這種情況下consumer端的響應就會比較慢的,需要等處理完才行。

    一般情況下,一定是一個consumer group處理一個topic的messageBest Practice是這個consumer group裡面consumer的數量等於topic裡面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。如果這個consumer group裡面consumer的數量小於topic裡面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic裡面的所有partition都會被處理到的。如果這個consumer group裡面consumer的數量大於topic裡面partition的數量,多出的consumer thread就會閒著啥也不幹,剩下的是一個consumer thread處理一個partition,這就造成了資源的浪費,因為一個partition不可能同時被兩個consumer thread去處理

  • 同步非同步:Producer採用非同步push方式,極大提高Kafka系統的吞吐率(可以通過引數控制是採用同步還是非同步方式)

  • 離線資料裝載:Kafka由於對可拓展的資料持久化的支援,它也非常適合向Hadoop或者資料倉庫中進行資料裝載。

  • push-and-pull : Kafka中的Producer和consumer採用的是push-and-pull模式,即Producer只管向broker push訊息,consumer只管從broker pull訊息,兩者對訊息的生產和消費是非同步的。

三、Kafka核心特性

3.1 訊息可靠性

在訊息系統中,保證訊息在生產和消費過程中的可靠性是十分重要的,在實際訊息傳遞過程中,可能會出現如下三中情況:

  • 一個訊息傳送失敗
  • 一個訊息被髮送多次
  • 最理想的情況:exactly-once ,一個訊息傳送成功且僅傳送了一次

一個訊息如何算投遞成功,Kafka提供了三種模式:

  • 第一種是啥都不管,傳送出去就當作成功,這種情況當然不能保證訊息成功投遞到broker;
  • 第二種是Master-Slave模型,只有當Master和所有Slave都接收到訊息時,才算投遞成功,這種模型提供了最高的投遞可靠性,但是損傷了效能;
  • 第三種模型,即只要Master確認收到訊息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數情況下都會中和可靠性和效能選擇第三種模

訊息在broker上的可靠性,因為訊息會持久化到磁碟上,所以如果正常stop一個broker,其上的資料不會丟失;但是如果不正常stop,可能會使存在頁面快取來不及寫入磁碟的訊息丟失,這可以通過配置flush頁面快取的週期、閾值緩解,但是同樣會頻繁的寫磁碟會影響效能,又是一個選擇題,根據實際情況配置。

訊息消費的可靠性,Kafka提供的是**“At least once”** 模型,因為訊息的讀取進度由offset提供,offset可以由消費者自己維護也可以維護在zookeeper裡,但是當訊息消費後consumer掛掉,offset沒有即時寫回,就有可能發生重複讀的情況,這種情況同樣可以通過調整commit offset週期、閾值緩解,甚至消費者自己把消費和commit offset做成一個事務解決,但是如果你的應用不在乎重複消費,那就乾脆不要解決,以換取最大的效能。

可以通過ack引數控制。

  • 當ack=0,表示producer不會等待broker的響應,所以,producer無法知道訊息是否傳送成功,這樣有可能會導致資料丟失,但同時,acks值為0會得到最大的系統吞吐量。
  • 當ack=1,表示producer會在leader partition收到訊息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到訊息。
  • 當ack=-1,producer會在所有備份的partition收到訊息時得到broker的確認,這個設定可以得到最高的可靠性保證。

從Producer端看:Kafka是這麼處理的,當一個訊息被髮送後,Producer會等待broker成功接收到訊息的反饋(可通過引數控制等待時間),如果訊息在途中丟失或是其中一個broker掛掉,Producer會重新發送(我們知道Kafka有備份機制,可以通過引數控制是否等待所有備份節點都收到訊息).

從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了訊息,但卻在處理過程中掛掉,此時Consumer可以通過這個offset值重新找到上一個訊息再進行處理。Consumer還有許可權控制這個offset值,對持久化到broker端的訊息做任意處理。

3.2 壓縮

Kafka支援以集合(batch)為單位傳送訊息,在此基礎上,Kafka還支援對訊息集合進行壓縮。Producer端可以通過GZIP或Snappy格式對訊息集合進行壓縮。Producer端進行壓縮之後,在Consumer端需進行解壓 。壓縮的好處就是減少傳輸的資料量,減輕對網路傳輸的壓力,在對大資料處理上,瓶頸往往體現在網路上而不是CPU(壓縮和解壓會耗掉部分CPU資源)。

那麼如何區分訊息是壓縮的還是未壓縮的呢,Kafka在訊息頭部添加了一個描述壓縮屬性位元組,這個位元組的後兩位表示訊息的壓縮採用的編碼,如果後兩位為0,則表示訊息未被壓縮。

3.3 備份機制

備份機制是Kafka0.8版本 的新特性,備份機制的出現大大提高了Kafka叢集的可靠性、穩定性。有了備份機制後,Kafka允許叢集中的節點掛掉後而不影響整個叢集工作。**一個備份數量為n的叢集允許n-1個節點失敗。**在所有備份節點中,有一個節點作為lead節點,這個節點儲存了其它備份節點列表,並維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:

備份機制

3.4 無狀態的Kafka Broker

  • broker沒有副本機制,一旦broker宕機,該broker的訊息將都不可用
  • broker不儲存訂閱者的狀態,由訂閱者自己儲存
  • 無狀態導致訊息的刪除成為難題(可能刪除的訊息正在被訂閱),kafka採用基於時間的SLA(服務水平保證),訊息儲存一定時間(通常為7天)後會被刪除。
  • 訊息訂閱者可以rewind back到任意位置重新進行消費,當訂閱者故障時,可以選擇最小的offset進行重新讀取消費訊息。

3.5 message的交付與生命週期

  • 不是嚴格的JMS, 因此kafka對訊息的重複、丟失、錯誤以及順序型沒有嚴格的要求。(這是與AMQ最大的區別)
  • kafka提供at-least-once delivery,即當consumer宕機後,有些訊息可能會被重複delivery。
  • 因每個partition只會被consumer group內的一個consumer消費,故kafka保證每個partition內的訊息會被順序的訂閱。
  • Kafka為每條訊息為每條訊息計算CRC校驗,用於錯誤檢測,CRC校驗不通過的訊息會直接被丟棄掉

四、參考

  1. Kafka官方文件http://kafka.apache.org/intro
  2. https://blog.csdn.net/YChenFeng/article/details/74980531
  3. https://blog.csdn.net/suifeng3051/article/details/48053965