1. 程式人生 > >Kafka監控工具KafkaOffsetMonitor配置及使用

Kafka監控工具KafkaOffsetMonitor配置及使用

一、KafkaOffsetMonitor簡述

KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目瞭然。

二、KafkaOffsetMonitor下載

KafkaOffsetMonitor託管在Github上,可以通過Github下載。 下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases

或者下載百度網盤:連結:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu

三、KafkaOffsetMonitor啟動

將下載下來的KafkaOffsetMonitor jar包上傳到linux上,可以新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯命令來執行這個jar包:

複製程式碼

[[email protected] KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088  --refresh 5.seconds --retain 1
.days 按回車後,可以看到控制檯輸出: serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp} 2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started
[email protected]
:8088

複製程式碼

如果沒有指定埠,則預設會開啟一個隨機埠。

複製程式碼

引數說明:

zk :zookeeper主機地址,如果有多個,用逗號隔開
port :應用程式埠
refresh :應用程式在資料庫中重新整理和儲存點的頻率
retain :在db中保留多長時間
dbName :儲存的資料庫檔名,預設為offsetapp

複製程式碼

為了更方便的啟動KafkaOffsetMonitor,可以寫一個啟動指令碼來直接執行,我這裡新建一個名為:kafka-monitor-start.sh的指令碼,然後編輯這個指令碼:

複製程式碼

[[email protected] KafkaMonitor]# vim kafka-monitor-start.sh 
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m  -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8088 \
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;

複製程式碼

然後退出儲存即可,接下來修改一下kafka-monitor-start.sh的許可權

[[email protected] KafkaMonitor]# chmod +x kafka-monitor-start.sh 

啟動KafkaOffsetMonitor:

[[email protected] KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh &
[1] 6551
[[email protected] KafkaMonitor]# lsof -i:8088
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    6552 root   16u  IPv6  26047      0t0  TCP *:radan-http (LISTEN)

四、KafkaOffsetMonitor Web UI

在遊覽器中輸入:http://ip:port即可以檢視KafkaOffsetMonitor Web UI,如下圖:

 

在下圖中有一個Visualizations選項卡,點選其中的Cluster Overview可以檢視當前Kafka叢集的Broker情況

五、簡單的Producer

1、新建一個Topic

  首先為本次試驗新建一個Topic,命令如下:

複製程式碼

bin/kafka-topics.sh \
    --create \
    --zookeeper 10.0.0.50:12181 \
    --replication-factor 3 \
    --partition 3 \
    --topic kafkamonitor-simpleproducer

複製程式碼

2、新建SimpleProducer程式碼

  在上一篇文章中提到的Producer封裝Github程式碼的基礎上,寫了一個往kafkamonitor-simpleproducer傳送message的java程式碼。

複製程式碼

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;

/**
 * Created by ckm on 2016/8/30.
 */
public class SimpleProducer {
    public static void main(String[] args) {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
        int i = 0;
        String message = "";
        while (true) {
            message = "test-simple-producer : " + i ++;
            kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message);
        }
    }
}

複製程式碼

  程式執行效果:    這裡寫圖片描述

3、ConsoleConsumer消費該topic

  用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer

  消費截圖如下:    這裡寫圖片描述

4、KafkaOffsetMonitor頁面

(1)在Topic List選項卡中,我們可以看到剛才新建的kafkamonitor-simpleproducer    這裡寫圖片描述 (2)點開後,能看到有一個console-consumer正在消費該topic    這裡寫圖片描述 (3)繼續進入該Consumer,可以檢視該Consumer當前的消費狀況    這裡寫圖片描述   這張圖片的左上角顯示了當前Topic的生產速率,右上角顯示了當前Consumer的消費速率。    圖片中還有三種顏色的線條,藍色的表示當前Topic中的Message數目,灰色的表示當前Consumer消費的offset位置,紅色的表示藍色灰色的差值,即當前Consumer滯後於Producer的message數目。  (4)看一眼各partition中的message消費情況    這裡寫圖片描述   從上圖可以看到,當前有3個Partition,每個Partition中的message數目分佈很不均勻。這裡可以與接下來的自定義Producer的情況進行一個對比。

六、自定義Partitioner的Producer

1、新建一個Topic

複製程式碼

bin/kafka-topics.sh \
    --create \
    --zookeeper 10.0.0.50:12181 \
    --replication-factor 3 \
    --partition 3 \
    --topic kafkamonitor-partitionedproducer

複製程式碼

2、Partitioner程式碼

  邏輯很簡單,迴圈依次往各Partition中傳送message。

複製程式碼

import kafka.producer.Partitioner;

/**
 * Created by ckm on 2018/1/8.
 */
public class TestPartitioner implements Partitioner {
    public TestPartitioner() {
    }

    @Override
    public int partition(Object key, int numPartitions) {
        int intKey = (int) key;
        return intKey % numPartitions;
    }
}

複製程式碼

3、Producer程式碼

  將自定義的Partitioner設定到Producer,其他呼叫過程和二中類似。

複製程式碼

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;

/**
 * Created by ckm on 2016/8/30.
 */
public class PartitionedProducer {
    public static void main(String[] args) {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
        kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner");
        int i = 0;
        String message = "";
        while (true) {
            message = "test-partitioner-producer : " + i;
            System.out.println(message);
            kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message);
            i ++;
        }
    }
}

複製程式碼

  程式碼執行效果如下圖:    這裡寫圖片描述

4、ConsoleConsumer消費Message

bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer

  消費效果如下圖:    這裡寫圖片描述

5、KafkaOffsetMonitor頁面

  其他頁面與上面的類似,這裡只觀察一下每個partition中的message數目與第二節中的對比。可以看到這裡每個Partition中message分別是很均勻的。    這裡寫圖片描述

注意事項:    注意這裡有一個坑,預設情況下Producer往一個不存在的Topic傳送message時會自動建立這個Topic。由於在這個封裝中,有同時傳遞message和topic的情況,如果呼叫方法時傳入的引數反了,將會在Kafka叢集中自動建立Topic。在正常情況下,應該是先把Topic根據需要建立好,然後Producer往該Topic傳送Message,最好把Kafka這個預設自動建立Topic的功能關掉。    那麼,假設真的不小心建立了多餘的Topic,在刪除時,會出現“marked for deletion”提示,只是將該topic標記為刪除,使用list命令仍然能看到。如果需要調整這兩個功能的話,在server.properties中配置如下兩個引數:

引數 預設值 作用
auto.create.topics.enable true Enable auto creation of topic on the server
delete.topic.enable false Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off

七,KafkaOffsetMonitor 總結

KafkaOffsetMonitor:程式一個jar包的形式執行,部署較為方便。只有監控功能,使用起來也較為安全。 除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款: Kafka Web Console:監控功能較為全面,可以預覽訊息,監控Offset、Lag等資訊,但存在bug,不建議在生產環境中使用。 Kafka Manager:偏向Kafka叢集管理,若操作不當,容易導致叢集出現故障。對Kafka實時生產和消費訊息是通過JMX實現的。沒有記錄Offset、Lag等資訊。