1. 程式人生 > >三、訊息處理過程與叢集維護

三、訊息處理過程與叢集維護

一、Kafka訊息組織原理

1.磁碟重認識

當需要從磁碟讀取資料時,要確定讀的資料在哪個磁軌,哪個扇區:首先必須找到柱面,即磁頭需要移動對準相應磁軌,這個過程叫做尋道,所耗費時間叫做尋道時間;然後目標扇區旋轉到磁頭下,這個過程耗費的時間叫做旋轉時間;一次訪盤請求(讀/寫)完成過程由三個動作組成尋道(時間):磁頭移動定位到指定磁軌;旋轉延遲(時間):等待指定扇區從磁頭下旋轉經過;資料傳輸(時間):資料在磁碟、記憶體與網路之間的實際傳輸由於儲存介質的特性,磁碟本身存取就比主存慢,再加上機械運動耗費,磁碟的存取速度往往是主存的幾百分之一甚至幾千分支一

怎麼樣才能提高磁碟的讀寫效率呢?

根據資料的區域性性原理 ,有以下兩種方法

•預讀或者提前讀;

•合併寫——多個邏輯上的寫操作合併成一個大的物理寫操作中;

採用磁碟順序讀寫(不需要尋道時間,只需很少的旋轉時間)。實驗結果:在一個6 7200rpm SATA RAID-5 的磁碟陣列上線性寫的速度大概是300M/秒,但是隨機寫的速度只有50K/秒,兩者相差將近10000倍。

2.Kafka訊息的寫入原理

一般的將資料從檔案傳到套接字的路徑:1.作業系統將資料從磁碟讀到核心空間的頁快取中;2.應用將資料從核心空間讀到使用者空間的快取中;3.應用將資料寫回記憶體空間的套接字快取中4.作業系統將資料從套接字快取寫到網絡卡快取中,以便將資料經網路發出;

這樣做明顯是低效的,這裡有四次拷貝,兩次系統呼叫。如果使用sendfile(Java 為: FileChannel.transferTo api),兩次拷貝可以被避免:允許作業系統將資料直接從頁快取傳送到網路上。優化後,只有最後一步將資料拷貝到網絡卡快取中是需要的。

Kafka topic資訊

Kafka 訊息檔案儲存  (tree)

3.Kafka訊息的刪除原理

從最久的日誌段開始刪除(按日誌段為單位進行刪除),然後逐步向前推進,直到某個日誌段不滿足條件為止,刪除條件:滿足給定條件predicate(配置項log.retention.{ms,minutes,hours}和log.retention.bytes指定);不能是當前啟用日誌段;大小不能小於日誌段的最小大小(配置項log.segment.bytes配置)要刪除的是否是所有日誌段,如果是的話直接呼叫roll方法進行切分,因為Kafka至少要保留一個日誌段;

 

二、Kafka訊息檢索原理

1.Kafka訊息的segment file的組成和物理結構

2.Kafka訊息的index file的組成和物理結構

3.Kafka訊息檢索過程

以讀取offset=368776的message為例,需要通過下面2個步驟查詢:第一步查詢segment file;以上圖為例,其中00000000000000000000.index表示最開始的檔案,起始偏移量(offset)為0.第二個檔案00000000000000368769.index的訊息量起始偏移量為368770 = 368769 + 1。只要根據offset二分查詢檔案列表,就可以快速定位到具體檔案。當offset=368776時定位到00000000000000368769.index|log第二步通過segment file查詢message;算出368776-368770=6,取00000000000000368769.index檔案第三項(6,1407),得出從00000000000000368769.log檔案頭偏移1407位元組讀取一條訊息即可

三、Kafka叢集維護

1.Kafka叢集基本資訊實時檢視和修改

叢集資訊實時檢視(topic工具):列出叢集當前所有可用的topic:bin/kafka-topics.sh --list –zookeeper zookeeper_address

檢視叢集特定topic 資訊:bin/kafka-topics.sh --describe --zookeeper zookeeper_address --topic topic_name

叢集資訊實時修改(topic工具):建立topic:bin/kafka-topics.sh --create --zookeeper zookeeper_address --replication-factor 1 --partitions 1 --topic topic_name增加(不能減少) partition(最後的4是增加後的值):bin/kafka-topics.sh --zookeeper zookeeper_address --alter –topic topic_name --partitions 4

Topic-level configuration 配置都能修改

2.Kafka叢集leader平衡機制

每個partitiion的所有replicas叫做“assigned replicas”,“assigned replicas”中的第一個replicas叫“preferred replica”,剛建立的topic一般“preferred replica”是leader。下圖中Partition 0的broker  2就是preferred replica”,預設會成為該分割槽的leader。

叢集leader平衡:bin/kafka-preferred-replica-election.sh –zookeeper zookeeper_addressauto.leader.rebalance.enable=true

3.Kafka叢集分割槽日誌遷移

遷移topic資料到其他broker,請遵循下面四步:寫json檔案,檔案格式如下:cat topics-to-move.json{"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1}

使用–generate生成遷移計劃(下面的操作是將topic: foo1和foo2移動到broker 5,6):bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" –generate這一步只是生成計劃,並沒有執行資料遷移;使用–execute執行計劃:bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json –execute執行前最好儲存當前的分配情況,以防出錯回滾

•使用–verify驗證是否已經遷移完成 遷移某個topic的某些特定的partition資料到其他broker,步驟與上面一樣,但是json檔案如下面所示: cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]} 可以指定到topic的分割槽編號 kafka-reassign-partitions.sh工具會複製磁碟上的日誌檔案,只有當完全複製完成,才會刪除遷移前磁碟上的日誌檔案。執行分割槽日誌遷移需要注意: •kafka-reassign-partitions.sh 工具的粒度只能到broker,不能到broker的目錄(如果broker上面配置了多個目錄,是按照磁碟上面已駐留的分割槽數來均勻分配的),所以,如果topic之間的資料,或者topic的partition之間的資料本身就不均勻,很有可能造成磁碟資料的不均勻: •對於分割槽資料較多的分割槽遷移資料會花大量的時間,所以建議在topic資料量較少或磁碟有效資料較少的情況下執行資料遷移操作;

進行分割槽遷移時最好先保留一個分割槽在原來的磁碟,這樣不會影響正常的消費和生產,如果目的是將分割槽5(brober1,5)遷移到borker2,3。可以先將5遷移到2,1,最後再遷移到2,3。而不是一次將1,5遷移到2,3。因為一次遷移所有的副本,無法正常消費和生產,部分遷移則可以正常消費和生產

四、Kafka叢集監控

1.Kafka Offset Monitor介紹

在生產環境需要叢集高可用,所以需要對Kafka叢集進行監控。Kafka Offset Monitor可以監控Kafka叢集以下幾項:Kafka叢集當前存活的broker集合;Kafka叢集當前活動topic集合;消費者組列表Kafka叢集當前consumer按組消費的offset lag數(即當前topic當前分割槽目前有多少訊息積壓而沒有及時消費)

部署Kafka Offset Minotor: •github下載jar包KafkaOffsetMonitor-assembly-0.2.0.jar : https://github.com/quantifind/KafkaOffsetMonitor/releases •啟動Kafka Offset Minotor : java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk zk-01,zk-02 --refresh 5.minutes --retain 1.day &

2.Kafka Offset Monitor使用

3.Kafka Manager介紹

Kafka Manager由雅虎開源,提供以下功能:管理幾個不同的叢集;容易地檢查叢集的狀態(topics, brokers, 副本的分佈, 分割槽的分佈) ;選擇副本基於叢集的當前狀態產生分割槽分配重新分配分割槽

Kafka Manager的安裝,方法一(不但要求能上網,還要求能FQ): •安裝sbt: 下載後,解壓並配置環境變數(將SBT_HOME/bin配置到PATH變數中) 安裝Kafka Manager : •git clone https://github.com/yahoo/kafka-manager •cd kafka-manager •sbt clean dist 部署Kafka Manager •修改conf/application.conf,把kafka-manager.zkhosts改為自己的zookeeper伺服器地址 •bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8007 &

Kafka Manager的安裝,方法二:下載打包好的Kafka manager(也可以在課程原始碼處下載):https://github.com/scootli/kafka-manager-1.0-SNAPSHOT/tree/master/kafka-manager-1.0-SNAPSHOT下載後解壓修改conf/application.conf,把Kafka-manager.zkhosts改為自己的zookeeper伺服器地址bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8007 &

4.Kafka Manager使用