1. 程式人生 > >加米谷:Kafka OffsetMonitor:監控消費者和延遲的隊列

加米谷:Kafka OffsetMonitor:監控消費者和延遲的隊列

Kafka 加米谷大數據

一個小應用程序來監視kafka消費者的進度和它們的延遲的隊列。 KafkaOffsetMonitor是用來實時監控Kafka集群中的consumer以及在隊列中的位置(偏移量)。 你可以查看當前的消費者組,每個topic隊列的所有partition的消費情況。可以很快地知道每個partition中的消息是否 很快被消費以及相應的隊列消息增長速度等信息。這些可以debug kafka的producer和consumer,你完全知道你的系統將 會發生什麽。 這個web管理平臺保留的partition offset和consumer滯後的歷史數據(具體數據保存多少天我們可以在啟動的時候配 置),所以你可以很輕易了解這幾天consumer消費情況。 KafkaOffsetMonitor這款軟件是用Scala代碼編寫的,消息等歷史數據是保存在名為offsetapp.db數據庫文件中,該數據 庫是SQLLite文件,非常的輕量級。雖然我們可以在啟動KafkaOffsetMonitor程序的時候指定數據更新的頻率和數據保存 的時間,但是不建議更新很頻繁,或者保存大量的數據,因為在KafkaOffsetMonitor圖形展示的時候會出現圖像展示過 慢,或者是直接導致內存溢出了。 所有的關於消息的偏移量、kafka集群的數量等信息都是從Zookeeper中獲取到的,日誌大小是通過計算得到的。 消費者組列表

技術分享圖片

screenshot 消費組的topic列表

技術分享圖片

screenshot 圖中參數含義解釋如下: topic:創建時topic名稱 partition:分區編號 offset:表示該parition已經消費了多少條message logSize:表示該partition已經寫了多少條message Lag:表示有多少條message沒有被消費。 Owner:表示消費者 Created:該partition創建時間 Last Seen:消費狀態刷新最新時間。 topic的歷史位置

技術分享圖片

screenshot Offset存儲位置 kafka能靈活地管理offset,可以選擇任意存儲和格式來保存offset。KafkaOffsetMonitor目前支持以下流行的存儲格 式。 kafka0.8版本以前,offset默認存儲在zookeeper中(基於Zookeeper) kafka0.9版本以後,offset默認存儲在內部的topic中(基於Kafka內部的topic) Storm Kafka Spout(默認情況下基於Zookeeper) KafkaOffsetMonitor每個運行的實例只能支持單一類型的存儲格式。 下載 可以到github下載KafkaOffsetMonitor源碼。 https://github.com/quantifind/KafkaOffsetMonitor 編譯KafkaOffsetMonitor命令: sbt/sbt assembly 不過不建議你自己去下載,因為編譯的jar包裏引入的都是外部的css和js,所以打開必須聯網,都是國外的地址,你編 譯的時候還要修改js路徑,我已經搞定了,你直接下載就好了。

啟動 編譯完之後,將會在KafkaOffsetMonitor根目錄下生成一個類似KafkaOffsetMonitor-assembly-0.3.0- SNAPSHOT.jar的jar文件。這個文件包含了所有的依賴,我們可以直接啟動它: java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ --zk zk-server1,zk-server2 \ --port 8080 \ --refresh 10.seconds \ --retain 2.days 啟動方式2,創建腳本,因為您可能不是一個kafka集群。用腳本可以啟動多個。 vim mobile_start_en.sh nohup java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m - XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --zk 127.0.0.1:2181 --port 8080 --refresh 10.seconds --retain 2.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log & 各個參數的含義: offsetStorage:有效的選項是"zookeeper","kafka","storm"。0.9版本以後,offset存儲的位置在kafka。 zk: zookeeper的地址 prot 端口號 refresh 刷新頻率,更新到DB。 retain 保留DB的時間 dbName 在哪裏存儲記錄(默認'offsetapp').



加米谷:Kafka OffsetMonitor:監控消費者和延遲的隊列