1. 程式人生 > >Kafka讀取__consumer_offsets和Kafka 0.11客戶端管理工具AdminClient

Kafka讀取__consumer_offsets和Kafka 0.11客戶端管理工具AdminClient

本文絕大部分內容參考自:http://www.cnblogs.com/huxi2b/p/6061110.html和http://www.cnblogs.com/huxi2b/p/7183219.html

一、Kafka讀取__consumer_offsets

注意:該實驗受限於kafka版本,我在kafka_2.11-0.9.0.1和kafka_2.10-0.10.1.0中都成功了,而在較舊的kafka_2.10-0.8.2.0(根本就不會產生__consumer_offsets)和最新的kafka_2.11-0.11.0.0(在第6步的時候報錯Exception in thread "main" java.lang.ClassNotFoundException: kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter)中卻無法完成

眾所周知,由於Zookeeper並不適合大批量的頻繁寫入操作,新版Kafka(0.8版本之後)已推薦將consumer的位移資訊儲存在Kafka內部的topic中,即__consumer_offsets topic,並且預設提供了kafka_consumer_groups.sh指令碼供使用者檢視consumer資訊。
不過依然有很多使用者希望瞭解__consumer_offsets topic內部到底儲存了什麼資訊,特別是想查詢某些consumer group的位移是如何在該topic中儲存的。針對這些問題,本文將結合一個例項探討如何使用kafka-simple-consumer-shell指令碼來查詢該內部topic。

1.建立topic “test”
[[email protected] kafka_2.10-0.10.1.0]$ bin/kafka-topics.sh --create --zookeeper h153:2181 --replication-factor 1 --partitions 2 --topic test

2.使用kafka-console-producer.sh指令碼生產訊息(本例中生產了4條訊息)
[[email protected] kafka_2.10-0.10.1.0]$ bin/kafka-console-producer.sh --broker-list h153:9092 --topic test

3.驗證訊息生產成功
[[email protected] kafka_2.10-0.10.1.0]$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list h153:9092 --topic test --time -1
test:1:2
test:0:2

引數解釋:--time -1 表示從最新的時間的offset中得到資料條數
輸出結果每個欄位分別表示topic、partition、untilOffset
上面的輸出結果表明總共生產了4條訊息

4.建立一個console consumer group
[[email protected] kafka_2.10-0.10.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server h153:9092 --topic test --from-beginning --new-consumer

在kafka啟動視窗你會看見輸出這些資訊
[2017-09-26 21:49:54,454] INFO [Group Metadata Manager on Broker 0]: Loading offsets and group metadata from [__consumer_offsets,32] (kafka.coordinator.GroupMetadataManager)
[2017-09-26 21:49:54,457] INFO [Group Metadata Manager on Broker 0]: Finished loading offsets from [__consumer_offsets,32] in 3 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2017-09-26 21:49:54,457] INFO [Group Metadata Manager on Broker 0]: Loading offsets and group metadata from [__consumer_offsets,35] (kafka.coordinator.GroupMetadataManager)
(預設情況下__consumer_offsets有50個分割槽)

使用bin/kafka-topics.sh  --list --zookeeper h153:2181你會看到__consumer_offsets生成

5.獲取該consumer group的group id(後面需要根據該id查詢它的位移資訊)
[[email protected] kafka_2.10-0.10.1.0]$ bin/kafka-consumer-groups.sh --bootstrap-server h153:9092 --list --new-consumer
輸出: console-consumer-88985  (記住這個id!)

6.查詢__consumer_offsets topic所有內容
注意:執行下面命令前先要在consumer.properties中設定exclude.internal.topics=false否則該執行該命令後卡住不動,按Ctrl+C也無法結束

[[email protected] kafka_2.10-0.10.1.0]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper h153:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
[console-consumer-88985,test,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800225,ExpirationTime 1506520200225]
[console-consumer-88985,test,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800225,ExpirationTime 1506520200225]
[console-consumer-88985,test,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800326,ExpirationTime 1506520200326]
[console-consumer-88985,test,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800326,ExpirationTime 1506520200326]
(第二次執行這個命令的時候得加--delete-consumer-offsets)

7.計算指定consumer group在__consumer_offsets topic中分割槽資訊
這時候就用到了第5步獲取的group.id(本例中是console-consumer-88985)。Kafka會使用下面公式計算該group位移儲存在__consumer_offsets的哪個分割槽上:
Math.abs(groupID.hashCode()) % numPartitions

所以在本例中,對應的分割槽=Math.abs("console-consumer-88985".hashCode()) % 50 = 39,即__consumer_offsets的分割槽39儲存了這個consumer group的位移資訊,下面讓我們驗證一下。
(你可以寫個Java小程式直接輸出System.out.println(Math.abs("console-consumer-88985".hashCode()) % 50);即可知道結果)

8.獲取指定consumer group的位移資訊
[[email protected] kafka_2.10-0.10.1.0]$ bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 39 --broker-list h153:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
[console-consumer-88985,test,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800225,ExpirationTime 1506520200225]
[console-consumer-88985,test,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800225,ExpirationTime 1506520200225]
[console-consumer-88985,test,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800326,ExpirationTime 1506520200326]
[console-consumer-88985,test,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1506433800326,ExpirationTime 1506520200326]
(如果將39換為其他數字則不會有上面的內容輸出)

二、Kafka 0.11客戶端管理工具AdminClient
很多使用者都有直接使用程式API操作Kafka叢集的需求。在0.11版本之前,kafka的伺服器端程式碼(即新增kafka_2.**依賴)提供了AdminClient和AdminUtils可以提供部分的叢集管理操作,但社群官網主頁並沒有給出這兩個類的使用文件。使用者只能自行檢視原始碼和測試用例才能瞭解具體的使用方法。倘若使用客戶端API的話(即新增kafka_clients依賴),使用者必須構造特定的請求並自覺編寫程式碼向指定broker建立Socket連線併發送請求,同樣是十分繁瑣。故Kafka 0.11版本引入了客戶端的AdminClient工具。注意,雖然和原先伺服器端的AdminClient類同名,但這個工具是屬於客戶端的,因此需要在程式中新增kafka_clients依賴,比如Gradle的話則增加 compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'

該工具提供的所有功能包括:
1.建立topic
2.查詢所有topic
3.查詢單個topic詳情
4.刪除topic
5.修改config(包括BROKER和TOPIC資源的config)
6.查詢資源config詳情
7.建立ACL
8.查詢ACL詳情
9.刪除ACL
10.查詢整個叢集詳情

使用者使用該類的方式與Java clients的使用方式一致,不用連線Zookeeper,而是直接給定叢集中的broker列表。

下面給出一個該類的測試例項,列出了除ACL操作之外的所有操作樣例程式碼,如下所示:

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;

public class AdminClientTest {
	 
    private static final String TEST_TOPIC = "hui";
 
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "h153:9092");
 
        try (AdminClient client = AdminClient.create(props)) {
            createTopics(client);
            describeCluster(client);
            listAllTopics(client);
            describeTopics(client);
            alterConfigs(client);
            describeConfig(client);
            deleteTopics(client);
        }
    }
 
    public static void createTopics(AdminClient client) throws ExecutionException, InterruptedException {
        NewTopic newTopic = new NewTopic(TEST_TOPIC, 1, (short)1);
        CreateTopicsResult ret = client.createTopics(Arrays.asList(newTopic));
        ret.all().get();
        System.out.println("建立成功");
    }
    
    public static void describeCluster(AdminClient client) throws ExecutionException, InterruptedException {
        DescribeClusterResult ret = client.describeCluster();
        System.out.println(String.format("Cluster id-->%s ; controller-->%s", ret.clusterId().get(), ret.controller().get()));
        System.out.print("Current cluster nodes info-->");
        for (Node node : ret.nodes().get()) {
            System.out.println(node);
        }
    }
    
    public static void listAllTopics(AdminClient client) throws ExecutionException, InterruptedException {
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true); // includes internal topics such as __consumer_offsets
        ListTopicsResult topics = client.listTopics(options);
        Set<String> topicNames = topics.names().get();
        System.out.println("Current topics in this cluster: " + topicNames);
    }
    
    public static void describeTopics(AdminClient client) throws ExecutionException, InterruptedException {
        DescribeTopicsResult ret = client.describeTopics(Arrays.asList(TEST_TOPIC, "__consumer_offsets"));
        Map<String, TopicDescription> topics = ret.all().get();
        for (Map.Entry<String, TopicDescription> entry : topics.entrySet()) {
            System.out.println(entry.getKey() + " ===> " + entry.getValue());
        }
    }
 
    public static void alterConfigs(AdminClient client) throws ExecutionException, InterruptedException {
        Config topicConfig = new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact")));
        client.alterConfigs(Collections.singletonMap(
                new ConfigResource(ConfigResource.Type.TOPIC, TEST_TOPIC), topicConfig)).all().get();
    }
    
    public static void describeConfig(AdminClient client) throws ExecutionException, InterruptedException {
        DescribeConfigsResult ret = client.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, TEST_TOPIC)));
        Map<ConfigResource, Config> configs = ret.all().get();
        for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
            ConfigResource key = entry.getKey();
            Config value = entry.getValue();
            System.out.println(String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
            Collection<ConfigEntry> configEntries = value.entries();
            for (ConfigEntry each : configEntries) {
                System.out.println(each.name() + " = " + each.value());
            }
        }
    }

    public static void deleteTopics(AdminClient client) throws ExecutionException, InterruptedException {
        KafkaFuture<Void> futures = client.deleteTopics(Arrays.asList(TEST_TOPIC)).all();
        futures.get();
        System.out.println("刪除成功");
    }
}

最後提一句,由於該類本質上是非同步傳送請求然後等待操作處理結果,因此每個返回的結果都使用了KafkaFuture進行了封裝——KafkaFuture實現了Java的Future介面。既然是Future,那麼使用者在具體實現上便可以自行決定是非同步接收結果還是同步等待。本例中大量使用了KafkaFuture.get(),即同步等待結果。

相關推薦

Kafka讀取__consumer_offsetsKafka 0.11客戶管理工具AdminClient

本文絕大部分內容參考自:http://www.cnblogs.com/huxi2b/p/6061110.html和http://www.cnblogs.com/huxi2b/p/7183219.html 一、Kafka讀取__consumer_offsets 注意:該實驗

推薦一款MongoDB的客戶管理工具--nosqlbooster

技術分享 sms def 方式 rip tab tor error 好用 今天給大家推薦一款MongoDB的客戶端工具--nosqlbooster,這個也是我工作中一直使用的連接管理MongoDB的工具。這個工具還有個曾用名--mongobooster。nosqlboost

使用mysql客戶管理工具navicat連線centos7 MySQL服務報10060錯誤

通常產生的原因有:1、navicat連線mysql服務的使用者的訪問許可權受限,2:centos7 防火牆機制不允許3306埠被訪問 1、針對第一種情況,我們可以檢視mysql 的使用者訪問許可權, 1.1、使用mysql -uroot -p 登陸連線到MySQL服務,並輸入密碼

MonjaDB—MongoDB ubuntu linux下管理工具 客戶管理工具 mongovue的替換者

MonjaDB 是一個 MongoDB 的 GUI 客戶端工具,提供直觀的 MongoDB 資料管理的功能,支援 Windows/Mac/Linux. MonjaDB 是一個 Eclipse 外掛,必須先安裝 Eclipse。 主要特點: 易用WYSIWYG 編輯 JSO

MongoDB的客戶管理工具--nosqlbooster 連接MongoDB服務器

ons 連接 tab connect 進入 accep 數據庫 用戶數據 inpu nosqlbooster的官網地址為https://nosqlbooster.com。大家如果想直接下載,可以登入下載網址https://nosqlbooster.com/downlo

Linux系統中KafKa安裝使用方法 java客戶連線kafka

kafka linux單機安裝1 下載並安裝kafka# tar zxvf kafka_2.12-1.1.0tgz  # mv kafka_2.12-1.1.0 /usr/local/kafka # cd /usr/local/kafka2 啟動服務執行kafka需要使用Zo

IDEA使用C3P0JDBCMYSQL8.0.11的XML配置方法

由於MYSQL最新版的加密方式有所不同,要換最新版的JDBC  mysql-connector-java-8.0.11.jar  和使用最新版的C3P0 :c3p0-0.9.5.2.jar  還有MYSQL的密碼方式設定成 mysql_native_password   使用

mysql-connector-java-5.1.35.jar 包MySQL6.0.11相沖突解決方案

報錯資訊如下 ### Cause: java.sql.SQLException: Could not retrieve transation read-only status server ; SQL []; Could not retrieve transation re

公眾號appweb都是客戶,都可以對接一個後臺

網站 clas span 端口 xxx 訪問 包括 包含 ext 1.公眾號和app和web都是客戶端,都可以對接一個後臺 2.域名中包含端口號嗎?:不包括,不包括 3.目前在IIS服務器上搭建了一個網站,域名也申請了,可是80端口不能使用,可以使用8000,每次訪問網

基於thrift的javapython分別作為客戶服務的調用實現

Coding except arr pes com ssa utf-8 encoding 中文亂碼 前面已經實現了純java的thrift的實現。 現在實現實現一下python作為客戶端和服務端的thrift的調用 1.python作為客戶端,java作為服務端 java服

網路程式設計(InetAddress類、SocketServerSocket、實現客戶伺服器之間的雙向通訊)

網路程式設計的底層是IO,通過IO將一臺計算機中的資料傳送到另一臺計算機中。傳送的時候,要知道接受方的地址,該地址即為IP地址。知道IP地址後即可進行傳送。A向B發訊息,訊息是發過去了,但是B要怎樣接受呢?因此定義了埠,B監聽了A所使用的埠。A發的訊息中含有埠號,當B接受到訊息時,知道了埠號

Swift3.0 — CocoaAsyncSocket客戶例子

本文參考了http://blog.csdn.net/taoerit/article/details/51324889這篇博文,這裡對它進行了更新,使用了最新的xCode8和Swift3。在這裡首先要感謝下原文作者,這篇博文幫了我的大忙,我學習過之後也在這裡做一下記錄。

linuxwindos7 zabbix ageent 客戶安裝_2018_lcf

一、linux 安裝 [[email protected] lnmp_soft]# tar -xf zabbix-3.4.4.tar.gz [[email protected] lnmp_soft]# cd zabbix-3.4.4/ [[email

windowslinux中RabbitMQ客戶下載安裝

本文主要內容是RabbitMQ的安裝步驟【Windows系統與linux上的安裝】及客戶端的簡單使用。Windows上安裝1安裝Erlang下載erlang地址:http://www.erlang.org/downloads    erlang安裝完成。安裝完成之後建立一個名

MAC電腦安裝Mysql伺服器Navicat for mysql客戶

1.下載連結 2.安裝mysql和Navicat Navicat for mysql,下載下來的本身就是個app,不用再次安裝,直接拖拽到應用程式即可 安裝mysql,按照安裝步驟安裝即可,安裝時會出現如下彈框,一定要記住,5.7之後的版本預設有個

沫憶心V1.0記錄(客戶遇到的問題)

                                 沫憶心V1.0記錄(客戶端遇到的問題)      這個專案還有幾個功能現在還沒有實現,所以就實現的功能中,我遇到的現在還記得的一些問題小結一下(本來是計劃將出現的問題一一記錄下來的,不知道後面自己幹嘛去了,建了

WindowsUbuntu安裝OpenVPN客戶

OpenVPN伺服器公司已經配置好,向公司申請VPN會得到一個.open配置檔案,使用OpencvVPN客戶端就可以連線到公司VPN。 目錄 Windows OpenVPN客戶端 下載連結 ------------------使用說明-------------

APP支付(支付寶微信生成客戶簽名及回撥驗籤)

<?php namespace Pn\Controller; use Think\Controller\RestController; class PayController extends RestController{ public function __

Java NIO學習筆記(四) 使用JDK 1.7 NIO2.0 實現客戶與伺服器的通訊

JDK1.7 提供了全新的非同步NIO模式。稱為:NIO2.0或AIO。該模式引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步通道提供兩種方式獲取獲取操作結果。分別是: 通過java.util.concurrent

webservicehttp請求的 客戶呼叫程式碼

最近用到這些 整理了一下 http請求 客戶端呼叫http請求程式碼:  /**   * <pre>   * 呼叫http服務端   * 請求的引數:平臺程式碼、簽名、請求xml