1. 程式人生 > >Python 基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控

Python 基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控

pid close 格式 5.1 數據采集 次數 git 但是 消息

基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控

By: 授客 QQ:1033553122

1.測試環境

python 3.4

zookeeper-3.4.13.tar.gz

下載地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

下載地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

kafka_2.12-2.1.0.tgz

下載地址1:

http://kafka.apache.org/downloads.html

下載地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

pykafka-2.8.0.tar.gz

下載地址1:

https://pypi.org/project/pykafka/

https://files.pythonhosted.org/packages/55/4b/4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98/pykafka-2.8.0.tar.gz

2.實現功能

實時采集Kafka生產者主題生產速率,主題消費速率,主題分區偏移,消費組消費速率,支持同時對多個來自不同集群的主題進行實時采集,支持同時對多個消費組實時采集

3.使用前提

1、“主題消費速率”&“消費組消費速率” 統計 依賴“消費組”,所以要統計消費速率,必須存在消費組才能統計;

2、“主題消費速率”&“消費組消費速率” 統計 依賴消費者自動、手動提交“offset”,所以所以要統計消費速率,必須確保消費者消費時,會提交消息的offset

3、Kafka版本大於等於0.10.1.1

4.使用方法

influxDB主機配置

KafkaMonitor\conf\influxDB.conf

[INFLUXDB]

influxdb_host = 10.203.25.106

influxdb_port = 8086

brokers集群配置

KafkaMonitor\conf\brokers.conf

[CLUSTER1]

broker1 = 127.0.0.1:9092

[bus]

#broker1 =10.202.xxx.xx:9096,10.202.xx.xx:9096,10.202.xxx.x:9096

格式說明:

[集群名稱]

自定義brokers標識 = broker ip:port配置(如果有多個broker,用英文逗號分隔)

如果不想對指定集群進行監控(不監控該集群的主題生產、消費速率,主題分區偏移,消費組消費速率),用 # 號註釋掉 該集群的“自定義brokers標識” 所在行即可,如上

topics主題配置

KafkaMonitor\conf\brokers.conf

[CLUSTER1]

topic1 = MY_TOPIC1

[bus]

topic1=NEXT_MARM_CORE_REPORT

#topic2=NEXT_MARM_CORE_EVENT

格式說明:

[集群名稱]

自定義topic 標識 = topic名稱

如果不想對指定主題進行監控(不監控該主題的生產、消費速率,主題分區偏移,該主題相關消費組消費速率),用 # 號註釋掉 該集群的“自定義 topic標識” 所在行即可,如上

註意:每個集群名稱下的 自定義 topic 標識不能重復

consumer_groups消費組配置

KafkaMonitor\conf\consumer_groups.conf

[CLUSTER1]
groupID1 = MY_TOPIC1|MY_GROUP1:5000

[bus]
#groupID1=NEXT_MARM_CORE_EVENT|NEXT_MARM_CORE_TASK
groupID2=NEXT_MARM_CORE_REPORT|NEXT_MARM_CORE_REPORT,NEXT_MARM_CORE_REPORTTAG

格式說明:

[集群名稱]

自定義consumer_groups 標識 = 主題名稱|消費該主題的消費組名稱[:提交msg offset的時間間隔(單位為 毫秒)](如果有多個消費組,彼此之間用逗號分隔)

註意:

1、如果有為消費組設置提交msg offset的時間間隔,並且該時間間隔大於統一設置的數據采集頻率,那麽該消費組的數據采集頻率將自動調整為對應的 提交msg offset的時間間隔/1000 + 1

2、主題消費速率的統計依賴消費該主題的所有消費組的數據信息,所以,同一個主題,不要配置在多個“自定義consumer_groups 標識”配置值中

3、主題消費速率數據采集頻率取最大值 max(統一設置的數據采集頻率,max(消費該主題的消費組提交msg offset的時間間隔/1000 + 1))

如果不想對指定消費組進行監控(不監控該消費組消費速率,消費組關聯的主題消費速率),用 # 號註釋掉 該集群的“自定義consumer_groups 標識” 所在行即可,如上,,或者把對應消費組及其提交msg offset的時間間隔信息刪除即可。

運行程序

python main.py 采集頻率(單位 秒) 采集時長

eg:

每5秒采集一次,總共采集120秒

python main.py 5 120

技術分享圖片

註意:

如果(根據配置自動調整後的)采集頻率時間間隔大於單次程序采樣耗時,則處理完成後立即進行下一次采樣,忽略采樣頻率設置,實際采集時長變長,但是采集次數不變 int(采集時長/采樣頻率)

grafana圖表配置

數據源配置

技術分享圖片

說明:Database db_+brokers.conf中配置的集群名稱

Dashboard變量配置

技術分享圖片

技術分享圖片

技術分享圖片

Dashboard Pannel主要配置項

技術分享圖片

技術分享圖片

技術分享圖片

技術分享圖片

技術分享圖片

效果展示

技術分享圖片

參考鏈接:

https://pykafka.readthedocs.io/en/latest/index.html

源碼下載地址:

https://gitee.com/ishouke/KafkaMonitor

Python 基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控