1. 程式人生 > ># 大資料求索(7): Kafka的重要原理和概念二與實戰

# 大資料求索(7): Kafka的重要原理和概念二與實戰

大資料求索(7): Kafka的重要原理和概念二

大資料最好的學習資料是官方文件。

Kafka官方文件地址:http://kafka.apache.org/

四、Kafka高效性相關設計

4.1 訊息的持久化

Kafka高度依賴檔案系統來儲存和快取訊息,一般的人認為磁碟是緩慢的,這導致人們對持久化結構具有競爭性持懷疑態度。其實,磁碟遠比你想象的要快或者慢,這決定於我們如何使用磁碟。

一個和磁碟效能有關的關鍵事實是:磁碟驅動器的吞吐量跟尋道延遲是相背離的。也就是說,線性寫的速度遠遠大於隨機寫。比如:在一個67200rpm SATA RAID-5 的磁碟陣列上線性寫的速度大概是600M/秒,但是隨機寫的速度只有100K/秒,兩者相差將近6000倍。線性讀寫在大多數應用場景下是可以預測的,因此,作業系統利用read-ahead和write-behind技術來從大的資料塊中預取資料,或者將多個邏輯上的寫操作組合成一個大的物理寫操作中。更多的討論可以在

ACMQueueArtical中找到,他們發現,**對磁碟的線性讀在有些情況下可以比記憶體的隨機訪問要快一些。 **

為了補償這個效能上的分歧,現代作業系統都會把空閒的記憶體用作磁碟快取,儘管在記憶體回收的時候會有一點效能上的代價,所有的磁碟讀寫操作會在這個統一的快取上進行。
此外,如果我們是在JVM的基礎上構建的,熟悉java記憶體應用管理的人應該清楚以下兩件事情:

  1. 一個物件的記憶體消耗是非常高的,經常是所存資料的兩倍或者更多。
  2. 隨著堆內資料的增多,Java的垃圾回收會變得非常昂貴。

基於這些事實,利用檔案系統並且依靠頁快取比維護一個記憶體快取或者其他結構要好

——我們至少要使得可用的快取加倍,通過自動訪問可用記憶體,並且通過儲存更緊湊的位元組結構而不是一個物件(這將有可能再次加倍)。這麼做的結果就是在一臺32GB的機器上,如果不考慮GC懲罰,將最多有28-30GB的快取。此外,這些快取將會一直存在即使服務重啟,然而程序內快取需要在記憶體中重構(10GB快取需要花費10分鐘)或者它需要一個完全冷快取啟動(非常差的初始化效能)。它同時也簡化了程式碼,因為現在所有的維護快取和檔案系統之間內聚的邏輯都在作業系統內部了,這使得這樣做比one-off in-process attempts更加高效與準確。如果你的磁碟應用更加傾向於順序讀取,那麼read-ahead在每次磁碟讀取中實際上獲取到這些快取中的有用資料。

以上這些建議了一個簡單的設計:不同於維護儘可能多的記憶體快取並且在需要的時候重新整理到檔案系統中,我們換一種思路:所有的資料不需要呼叫重新整理程式,而是立刻將它寫到一個持久化的日誌中。事實上,這僅僅意味著,資料將被傳輸到核心頁快取中並稍後被重新整理。我們可以增加一個配置項以讓系統的使用者來控制資料在什麼時候被重新整理到物理硬碟上。

4.2 常數時間效能保證

訊息系統中持久化資料結構的設計通常是維護著一個和消費佇列有關的B樹或者其它能夠隨機存取結構的元資料資訊。

B樹是一個很好的結構,可以用在事務型與非事務型的語義中。但是它需要一個很高的花費,儘管B樹的操作需要O(logN)。通常情況下,這被認為與常數時間等價,但這對磁碟操作來說是不對的。磁碟尋道一次需要10ms,並且一次只能尋一個,因此並行化是受限的

直覺上來講,一個持久化的佇列可以構建在對一個檔案的讀和追加上,就像一般情況下的日誌解決方案。儘管和B樹相比,這種結構不能支援豐富的語義,但是它有一個優點,所有的操作都是常數時間,並且讀寫之間不會相互阻塞。這種設計具有極大的效能優勢:最終系統性能和資料大小完全無關,伺服器可以充分利用廉價的硬碟來提供高效的訊息服務。

事實上還有一點,磁碟空間的無限增大而不影響效能這點,意味著我們可以提供一般訊息系統無法提供的特性。比如說,訊息被消費後不是立馬被刪除,我們可以將這些訊息保留一段相對比較長的時間(比如一個星期)。

4.3 進一步提高效率

有一種非常主要的應用場景是:處理Web活動資料,它的特點是資料量非常大,每一次的網頁瀏覽都會產生大量的寫操作。更進一步,我們假設每一個被髮布的訊息都會被至少一個consumer消費,因此我們更要努力讓消費變得更廉價。

通過上面的介紹,我們已經解決了磁碟方面的效率問題,除此之外,在此類系統中還有兩類比較低效的場景:

  • 太多小的I/O操作
  • 過多的位元組拷貝

為了減少大量小I/O操作的問題,kafka的協議是圍繞訊息集合構建的。Producer一次網路請求可以傳送一個訊息集合,而不是每一次只發一條訊息。在server端是以訊息塊的形式追加訊息到log中的,consumer在查詢的時候也是一次查詢大量的線性資料塊。訊息集合即MessageSet,實現本身是一個非常簡單的API,它將一個位元組陣列或者檔案進行打包。所以對訊息的處理,這裡沒有分開的序列化和反序列化的步驟,訊息的欄位可以按需反序列化(如果沒有需要,可以不用反序列化)。

另一個影響效率的問題就是位元組拷貝。為了解決位元組拷貝的問題,kafka設計了一種**“標準位元組訊息”,Producer、Broker、Consumer共享這一種訊息格式。Kakfa的message log在broker端就是一些目錄檔案**,這些日誌檔案都是MessageSet按照這種“標準位元組訊息”格式寫入到磁碟的。

維持這種通用的格式對這些操作的優化尤為重要:持久化log 塊的網路傳輸。流行的unix作業系統提供了一種非常高效的途徑來實現頁面快取和socket之間的資料傳遞。在Linux作業系統中,這種方式被稱作:sendfile system call(Java提供了訪問這個系統呼叫的方法:FileChannel.transferTo api)。

為了理解sendfile的影響,需要理解一般的將資料從檔案傳到socket的路徑:

  • 作業系統將資料從磁碟讀到核心空間的頁快取中
  • 應用將資料從核心空間讀到使用者空間的快取中
  • 應用將資料寫回核心空間的socket快取中
  • 作業系統將資料從socket快取寫到網絡卡快取中,以便將資料經網路發出

這種操作方式明顯是非常低效的,這裡有四次拷貝,兩次系統呼叫

如果使用sendfile,就可以避免兩次拷貝:作業系統將資料直接從頁快取傳送到網路上。所以在這個優化的路徑中,只有最後一步將資料拷貝到網絡卡快取中是需要的。 我們期望一個topic上有多個消費者是一種常見的應用場景。利用上述的zero-copy,資料只被拷貝到頁快取一次,然後就可以在每次消費時被重得利用,而不需要將資料存在記憶體中,然後在每次讀的時候拷貝到核心空間中。這使得訊息消費速度可以達到網路連線的速度。這樣一來,通過頁面快取和sendfile的結合使用,整個kafka叢集幾乎都以快取的方式提供服務,而且即使下游的consumer很多,也不會對整個叢集服務造成壓力。

五、Kafka的核心元件生產者-消費者

訊息系統通常都會由生產者,消費者,Broker三大部分組成,生產者會將訊息寫入到Broker,消費者會從Broker中讀取出訊息,不同的MQ實現的Broker實現會有所不同,不過Broker的本質都是要負責將訊息落地到服務端的儲存系統中。具體步驟如下:

  1. 生產者客戶端應用程式產生訊息:
    1. 客戶端連線物件將訊息包裝到請求中傳送到服務端
    2. 服務端的入口也有一個連線物件負責接收請求,並將訊息以檔案的形式儲存起來
    3. 服務端返回響應結果給生產者客戶端
  2. 消費者客戶端應用程式消費訊息:
    1. 客戶端連線物件將消費資訊也包裝到請求中傳送給服務端
    2. 服務端從檔案儲存系統中取出訊息
    3. 服務端返回響應結果給消費者客戶端
    4. 客戶端將響應結果還原成訊息並開始處理訊息

5.1 Producers

Producers直接傳送訊息到broker上的leader partition,不需要經過任何中介或其他路由轉發。為了實現這個特性,kafka叢集中的每個broker都可以響應producer的請求,並返回topic的一些元資訊,這些元資訊包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的等。

**Producer客戶端自己控制著訊息被推送到哪些partition。**實現的方式可以是隨機分配、實現一類隨機負載均衡演算法,或者指定一些分割槽演算法。Kafka提供了介面供使用者實現自定義的partition,使用者可以為每個訊息指定一個partitionKey,通過這個key來實現一些hash分割槽演算法。比如,把userid作為partitionkey的話,相同userid的訊息將會被推送到同一個partition。

以Batch的方式推送資料可以極大的提高處理效率,kafka Producer 可以將訊息在記憶體中累計到一定數量後作為一個batch傳送請求。Batch的數量大小可以通過Producer的引數控制,引數值可以設定為累計的訊息的數量(如500條)、累計的時間間隔(如100ms)或者累計的資料大小(64KB)。通過增加batch的大小,可以減少網路請求和磁碟IO的次數,當然具體引數設定需要在效率和時效性方面做一個權衡。

Producers可以非同步的並行的向kafka傳送訊息,但是通常producer在傳送完訊息之後會得到一個future響應,返回的是offset值或者傳送過程中遇到的錯誤。這其中有個非常重要的引數“acks”,這個引數決定了producer要求leader partition 收到確認的副本個數。

如果acks設定數量為0,表示producer不會等待broker的響應,所以,producer無法知道訊息是否傳送成功,這樣有可能會導致資料丟失,但同時,acks值為0會得到最大的系統吞吐量。

若acks設定為1,表示producer會在leader partition收到訊息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待直到broker確認收到訊息。

若設定為-1,producer會在所有備份的partition收到訊息時得到broker的確認,這個設定可以得到最高的可靠性保證,但吞吐量上會下降。

Kafka 訊息由一個定長的header變長的位元組陣列組成。因為kafka訊息支援位元組陣列,也就使得kafka可以支援任何使用者自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個訊息的大小,但我們推薦訊息大小不要超過1MB,通常一般訊息大小都在1~10kB之間。

釋出訊息時,kafka client先構造一條訊息,將訊息加入到訊息集set中(kafka支援批量釋出,可以往訊息集合中新增多條訊息,一次行釋出),send訊息時,producer client需指定訊息所屬的topic。

5.2 Consumers

Kafka提供了兩套consumer api,分為**high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的連線,並且這個API是完全無狀態的,每次請求都需要指定offset值,因此,這套API也是最靈活的。

在kafka中,當前讀到哪條訊息的offset值是由consumer來維護的,因此,consumer可以自己決定如何讀取kafka中的資料。比如,consumer可以通過重設offset值來重新消費已消費過的資料。不管有沒有被消費,kafka會儲存資料一段時間,這個時間週期是可配置的,只有到了過期時間,kafka才會刪除這些資料。(這一點與AMQ不一樣,AMQ的message一般來說都是持久化到mysql中的,消費完的message會被delete掉)

High-level API封裝了對叢集中一系列broker的訪問,可以透明的消費一個topic。它自己維持了已消費訊息的狀態,即每次消費的都是下一個訊息

High-level API還支援以組的形式消費topic,如果consumers有同一個組名,那麼kafka就相當於一個佇列訊息服務,而各個consumer均衡的消費相應partition中的資料。若consumers有不同的組名,那麼此時kafka就相當於一個廣播服務,會把topic中的所有訊息廣播到每個consumer。

對於Consumer group:

  1. 允許consumer group(包含多個consumer,如一個叢集同時消費)對一個topic進行消費,不同的consumer group之間獨立消費。
  2. 為了對減小一個consumer group中不同consumer之間的分散式協調開銷,指定partition為最小的並行消費單位,即一個group內的consumer只能消費不同的partition。

img

High level api和Low level api是針對consumer而言的,和producer無關。

High level api的consumer讀的partition的offsite是存在zookeeper上的。High level api 會啟動另外一個執行緒,每隔一段時間,offsite自動同步到zookeeper上。換句話說,如果使用了High level api, 每個message只能被讀一次,一旦讀了這條message之後,無論consumer的處理是否ok,High level api的另外一個執行緒會自動的把offiste+1同步到zookeeper上。如果consumer讀取資料出了問題,offsite也會在zookeeper上同步。因此,如果consumer處理失敗了,會繼續執行下一條,這往往是不對的行為。因此,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止,但是最後讀的這一條資料是丟失了,因為在zookeeper裡面的offsite已經+1了。等再次啟動conusmer group的時候,已經從下一條開始讀取處理了。

Low level api是consumer讀的partition的offsite在consumer自己的程式中維護。不會同步到zookeeper上。但是為了kafka manager能夠方便的監控,一般也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了,這條message的offsite我們自己維護,我們不會+1。下次再啟動的時候,還會從這個offsite開始讀。這樣可以做到exactly once對於資料的準確性有保證。

5.3 Replications、Partitions 和Leaders

kafka中的資料是持久化的並且能夠容錯的。Kafka允許使用者為每個topic設定副本數量,副本數量決定了有幾個broker來存放寫入的資料。如果你的副本數量設定為3,那麼一份資料就會被存放在3臺不同的機器上,那麼就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重啟機器時不會影響到資料消費。如果對資料持久化有更高的要求,可以把副本數量設定為3或者更多。

Kafka中的topic是以partition的形式存放的,每一個topic都可以設定它的partition數量,Partition的數量決定了組成topic的log的數量。Producer在生產資料時,會按照一定規則(這個規則是可以自定義的)把訊息釋出到topic的各個partition中。上面講的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。

關於如何設定partition值需要考慮的因素。一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設定的partition的數量小於consumer的數量,就會有消費者消費不到資料。所以,推薦partition的數量一定要大於同時執行的consumer的數量。另外一方面,建議partition的數量大於叢集broker的數量,這樣leader partition就可以均勻的分佈在各個broker中,最終使得叢集負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些記憶體來快取訊息資料,如果partition數量越大,就要為kafka分配更大的heap space。

六、實戰

一個單節點單broker的例子,如果需要多個broker,配置多個server-x.properties檔案並分別後臺啟動即可。

  1. 下載Kafka,版本為0.9.0.0,並解壓

  2. 啟動server

    需要先啟動zookeeper服務,在啟動Kafka服務

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

zookeeper.properties配置,修改以下幾條

* zookeeper.connect=wds:2181
* host.name=wds
* log.dirs=/home/hadoop/hadoop/tmp/kafka-logs
  1. 建立一個topic
kafka-topics.sh --create --zookeeper wds:2181 --replication-factor 1 --partitions 1 --topic hello_world

–partitions 1 # 建立一個分割槽
–replication-factor 3 # 三個副本
–topic hello_world 指定topic的名字

​ 檢視已經建立的topic

kafka-topics.sh --list --zookeeper wds:2181

​ 檢視某個topic詳細資訊

kafka-topics.sh --describe --zookeeper wds:2181 --topic my-replicated-topic
  1. 生產者生產訊息
kafka-console-producer.sh --broker-list wds:9092 --topic hello_world 
  1. 消費者消費訊息
kafka-console-consumer.sh --zookeeper wds:2181 --topic hello_world

七、參考

  1. Kafka官方網站http://kafka.apache.org/intro
  2. https://blog.csdn.net/YChenFeng/article/details/74980531
  3. https://blog.csdn.net/suifeng3051/article/details/48053965