1. 程式人生 > >kafka_0.10.1.0監控及管理

kafka_0.10.1.0監控及管理

1. kafka監控

kafka自身沒有監控管理頁面,無論是進行一些管理操作還是狀態的監控都要命令加一大堆記不住的引數,實在是很不方便,不過好在在github上開源了一些工具,在kafka的生態系統中也有提及到:

  • Kafka Manager: 都是以表格的形式展現資料,比較方便用來管理kafka,例如topic的建立、刪除以及分割槽的管理等。
  • Kafka Offset Monitor: 監控消費者以及所在分割槽的offset,幫助分析當前的消費以及生產是否順暢,功能比較單調但介面還可以。
  • Kafka Web Console:github上已經說明了不再更新了,且建議使用Kafka manager
  • kafkat: 一個簡化了的命令列工具,用來管理kafka的broker,partition,topic.
  • Capillary: 如果是kafka+storm整合使用,可以選擇該工具,是一個web應用

除了前面兩個,其它幾個都沒試用過,就算在網上查也是推薦前兩個而已,kafka manager基於jmx功能比較強大,利用它做管理方面;而KafkaOffsetMonitor從它的啟動引數來看應該是定時從zookeeper上獲取消費者的offset,以圖的形式展示,比較直觀(對於一些實現Exactly once的系統,offset並不儲存在zookeeper上面,它將不能使用),兩者結合使用,相得益彰。

2.1. Kafka-manager

kafka manager的原始碼:https://github.com/yahoo/kafka-manager 官方要求的kafka版本:Kafka 0.8.1.1 或者 0.8.2.x 或者 0.9.0.x,不過使用kafka_0.10.1.0時也能正常。 java版本要求:Java 8+

2.1.1 編譯原始碼

為了得到部署包kafka-manager-xxxx.zip,先根據上面的地址下載原始碼再編譯(不想這麼麻煩的話,可以去一些kafka的qq群,一般群共享裡都會有這個包)。kafka-manager工程是利用SBT進行構建的,所以編譯之前還需要安裝SBT,安裝Java 8

。最後執行命令編譯:

sbt clean dist
  • 1

網路不好的話可能需要重複編譯,成功後在target/universal目錄下可以看到kafka-manager-1.3.2.1.zip

2.1.2 修改配置

把編譯得到的zip包解壓,在conf目錄中有一個application.conf檔案,最小化的配置只需要在該檔案中修改kafka-manager.zkhosts引數即可:

kafka-manager.zkhosts="master:2181,slave1:2181,slave2:2181"
  • 1

2.1.3 kafka啟動jmx

kafka服務必需要開啟JMX,否則在下一步啟動kafka-manager時會出現:java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled! 啟動kafka服務時指定JMX_PORT值:

JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
  • 1

或者修改kafka-server-start.sh,在前面加上:

export JMX_PORT=9999
  • 1

2.1.4 啟動kafka-manager

以執行在linux上為例,啟動指令碼為bin/kafkak-manager,該指令碼會預設佔用9000埠,也可以通過引數修改埠以及指定java版本執行:

nohup bin/kafka-manager -java-home /usr/java/jdk1.8.0_101/ -Dhttp.port=8081 &
  • 1

2.2. KafkaOffsetMonitor

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar  com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk master,slave1,slave2:2181 --port 8082 --refresh 10.seconds  --retain 2.days
  • 1

3. kafka自帶指令碼工具

基本所有指令碼都是呼叫kafka-run-class.sh指令碼去執行一個命令模式的class.建議使用指令碼時參考指令碼的使用說明。

3.1. 主題管理:kafka-topics.sh

建立、刪除、檢視或者改變一個topic.

3.1.1. 建立topic

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --partitions 3 --replication-factor 2 --config flush.ms=1000 --config flush.messages=1
  • 1

建立一個名稱為test的topic,它有3個分割槽,每個分割槽兩個replica,通過–config給topic設定屬性,格式為key=value,如果有多個配置屬性則如上命令。這種建立方式kafka會自動把各個分割槽的replica分配到相應的broker,也可以在建立topic時手動指定哪個分割槽的哪個replica落在指定的broker,示例命令如下:

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test  --config flush.ms=1000 --config flush.messages=1 --replica-assignment 0:1,1:2
  • 1

關鍵的配置引數為–replica-assignment,該引數不能與–partitions和–replication-factor同時出現,引數的使用格式如下:

broker_id_for_part0_replica1: broker_id_for_part0_replica2, broker_id_for_part1_replica1: broker_id_for_part1_replica2, broker_id_for_part2_replica1: broker_id_for_part2_replica2

–replica-assignment 0:1,1:2表示有兩個分割槽,分割槽0的replica1在broker.id=0的kafka服務上,分割槽0的replica2在broker.id=1的kafka服務上;分割槽1的replica1在broker.id=1的kafka服務上,分割槽1的replica2在broker.id=2的kafka服務上。

3.1.2. 刪除topic

當使用delete命令刪除topic,預設只是進行標記,並沒有真正的刪除

這裡寫圖片描述

Note: This will have no impact if delete.topic.enable is not set to true. 需要在config/server.properties配置檔案中開啟delete.topic.enable=true

3.1.3. 檢視topic

bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test
  • 1

describe名稱為test的topic,將會顯示topic的分割槽數、replica因子、配置引數以及各分割槽的分佈情況(leader,replica,isr),如下圖:

這裡寫圖片描述 使用–describe時還可以結合其它一些引數對結果進行過濾:

  • topics-with-overrides:加上該引數時,只顯示config被覆蓋過的topic(例如使用下面的–alter修改config,或者建立topic時指定–config也算)。
  • unavailable-partitions:加上該引數時,只顯示leader不可用的分割槽。
  • under-replicated-partitions:加上該引數時,只顯示replica不足的分割槽。

3.1.4. 修改topic

根據–alter引數的說明,可以修改topic的分割槽數(目前只能是增加),修改配置config,以及修改replica(這裡貌似不準確,根據官網的文件說明,如果想要增加replication factor,應該使用kafka-reassign-partitions.sh指令碼)。

  • 增加分割槽:從2個分割槽增加到3個
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --partitions 3
  • 1

成功後describe一下topic:這裡寫圖片描述

新增分割槽不能改變現有的資料

  • 新增修改配置 根據上圖test主題的Configs:flush.messages=1,flush.ms=1000,嘗試把flush.ms修改為2000,命令如下:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --config flush.ms=2000
  • 1

成功後:這裡寫圖片描述

  • 刪除配置 –alter結合–delete-config一起使用可以刪除某項配置,嘗試刪除flush.ms配置專案:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --delete-config flush.ms
  • 1

成功後:這裡寫圖片描述

PS:對於使用–alter增加、修改和刪除config,從0.9.0.0版本後建議使用kafka-configs.sh指令碼。

3.2. 配置管理:kafka-configs.sh

這個指令碼專門是用來新增,修改和刪除實體的config的,其中操作的實體物件有:topic, client, user 和 broker。

3.2.1. 更新配置

新增和修改都可以統一說成更新,沒有則新增,存在即修改。結合alter,add-config以及其它一些配置,例如修改broker的某個config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --add-config 'leader.replication.throttled.rate=700000000'
  • 1
  • entity-type:實體型別,必須為(topics/clients/users/brokers)其中之一。
  • entity-name:實體名稱(topic的名稱,client的id,user的principal名稱,broker的id)。
  • add-config:格式為’ key1:value1,key2:[v21,v22],key3:value3’,多個配置可以寫在一起,比kafka-topic.sh的修改更人性化。

3.2.2. 檢視配置

執行上面命令給id=0的broker新增配置leader.replication.throttled.rate後,接著檢視一下該broker的config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --describe
  • 1

結果:這裡寫圖片描述

3.2.3. 刪除配置

接上,刪除id=0的broker的配置leader.replication.throttled.rate

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --delete-config 'leader.replication.throttled.rate'
  • 1

結果:這裡寫圖片描述

3.3. 消費者組管理:kafka-consumer-groups.sh

可以列出所有消費者組,檢視某個消費者組的詳細情況以及刪除消費者組的資訊(刪除只適用於舊版本基於zookeeper的消費都組)。

3.3.1. 列出消費者組

Kafka預設一直會有一個“KafkaManagerOffsetCache”的消費者組,為了更加直觀,先用kafka-console-consumer.sh啟動一個消費都,並加入一個叫做“test_group”的組:

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --consumer-property group.id=test_group
  • 1

接著使用以下命令列出所有的消費都組:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --list
  • 1

已經可以看到“test_group”的消費都組了:這裡寫圖片描述

3.3.2. 檢視消費者組

檢視消費者組的具體消費狀況,結合來分析目前叢集的穩健程度,執行以下命令:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group test_group
  • 1

結果:這裡寫圖片描述

3.4. 均衡leader:kafka-preferred-replica-election.sh

每個分割槽的所有replicas叫做”assigned replicas”,”assigned replicas”中的第一個replicas叫”preferred replica”,剛建立的topic一般”preferred replica”(優先副本)是leader。 各分割槽的讀取寫請求都是由leader來接收處理的,那麼當然希望各分割槽的leader可以均衡地分佈在各個broker之上,做到均衡負載,提高叢集穩定性以及充分利用資源。一般在建立topic時,kafka都會預設把leader平均分配,但當某個broker宕掉後,會導致該broker上的leader轉移到其它的broker上去,導致機群的負載不均衡,就算宕掉的broker恢復正常,它上面已經沒有leader。可以使用kafka-preferred-replica-election.sh工具令到恢復後的broker上的優先副本重新選舉成為leader,這樣又恢復到了宕掉之前的狀態。

下面來模擬一下整個過程,首先建立一個topic,3個分割槽,每個分割槽的leader分別在3個broker上面:這裡寫圖片描述

分割槽0的leader已經從broker0轉移到了broker1了,在Isr中也看不到原本broker0的兩個replica。最後重新啟動broker0並執行以下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181
  • 1

再觀察test的分割槽情況:這裡寫圖片描述

可以看到test已經恢復到最初的leader分佈情況了。預設是對所有分割槽進行優先副本選舉(preferred replica election),如果想指定操作某些分割槽,則需要配合–path-to-json-file引數,例如test有0,1,2三個分割槽,只想操作1,2分割槽,首先編譯test_election.json檔案,內容如下:

{“partitions”:[{“topic”: “test”, “partition”: 1}, {“topic”: “test”, “partition”: 2}]}

然後執行以下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181 --path-to-json-file test_election.json
  • 1

PS:其實可以配置kafka自動平衡leader的,在server.properties檔案中設定:auto.leader.rebalance.enable=true即可,而該配置預設是已經開啟的,想驗證的話可以重啟一個broker,隔一段時間後會發現leader會自動恢復。

3.5. 擴容重分割槽:kafka-reassign-partitions.sh

當有新的broker節點加入到已經在使用的叢集,kafka是不會自動均衡原本的資料到新節點的,需要手動對分割槽進行遷移,使得新節點可以對外提供服務。(對於後來建立和topic則不需要)。

3.5.1. 生成遷移計劃

首先肯定要知道需要對哪些topic進行遷移,且明確哪個分割槽需要遷移到哪個broker節點。現有一個分割槽test,具體如下圖:這裡寫圖片描述

手動編輯一個json檔案(例如命名為topics-to-move.json),表示哪些topic是需要遷移的,內容如下(可以指定多個topics):

{“topics”: [{“topic”: “test”}], “version”:1 }

想要把test的所有分割槽遷移到broker1,2,執行以下命令生成遷移計劃:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
  • 1
  • topics-to-move-json-file:上面建立的topics-to-move.js檔案
  • broker-list:分割槽需要遷移到的broker,格式如命令
  • generate:表示生成分配規則,內容是json串 執行結果如下:這裡寫圖片描述

顯示了當前的分配規則(可以用作回滾)以及新生成的分配規則,把內容儲存到檔案(expand-cluster-reassignment.json),當然,也可以手動修改裡面的內容,只要符合格式即可:

{“version”:1,”partitions”:[{“topic”:”test”,”partition”:1,”replicas”:[2,1]},{“topic”:”test”,”partition”:2,”replicas”:[1,2]},{“topic”:”test”,”partition”:0,”replicas”:[1,2]}]}

3.5.2. 執行遷移計劃

根據上一步生成的分配規則expand-cluster-reassignment.json啟動遷移,執行以下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
  • 1

然後describe一下,檢視新的分割槽分配情況:這裡寫圖片描述

可以看到現在所有分割槽的replica都已經全部遷移到了broker1,2上面。

3.5.3. 驗證遷移計劃

還是根據分配規則expand-cluster-reassignment.json驗證分割槽是否分配成功,執行以下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
  • 1

執行結果如下:這裡寫圖片描述

至此,分割槽的遷移已經完成。其實已經對分割槽規則熟悉的話,可以跳過生成遷移計劃這步,直接編寫expand-cluster-reassignment.json,然後執行驗證。

3.6. 增加副本:kafka-reassign-partitions.sh

為分割槽增加副本,還是使用kafka-reassign-partitions.sh命令,然後編輯副本規則json檔案即可。現有以下topic:這裡寫圖片描述

分割槽0有兩個replica,分別在broker1,2上,現在準備在broker0上新增一個replica,先建立副本分配json檔案(increase-replication-factor.json),內容如下:

{“version”:1, “partitions”:[{“topic”:”test”,”partition”:0,”replicas”:[0,1,2]}]}

然後指定increase-replication-factor.json執行下面的命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file increase-replication-factor.json --execute
  • 1

接著,同樣使用increase-replication-factor.json來驗證是否成功:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
  • 1

執行結果如下:這裡寫圖片描述

或者describe一下topic:這裡寫圖片描述