1. 程式人生 > >Kafka(二): Kafka 叢集部署與使用

Kafka(二): Kafka 叢集部署與使用

一、Kafka 叢集部署                                                           

    Kafka是一種分散式的釋出(producer/訂閱(consumer)的訊息系統,並支援實時和離線的資料處理、可擴充套件、持久的。上一次已經對kafka做了介紹,今天我們介紹如何部署、建立主題併發布訊息和訂閱訊息

     1、兩個虛擬機器:192.168.2.200192.168.2.201

   3、對兩個虛擬機器分別解壓

     > tar -xzf kafka_2.12-0.10.2.1.tgz
     > cd kafka_2.12-0.10.2.1.tgz

   4、配置 config/server.properties 檔案

        broker.id=200 / broker.id=201

        zookeeper.connect=192.168.2.200:2181,192.168.2.201:2181

       broker.id:每一個broker在叢集中的唯一表示,要求是正數

       zookeeper.connect:zookeeper叢集的地址,可以是多個,多個之間用逗號分割。

      5Kafka依zookeeperkafka的叢集的meta資訊儲存在zookeeper上,zookeeper管理consumer

offset,通過zookeeper的分散式系統協調來保證Kafka

     6zookeeper叢集先啟動,然後啟動192.168.2.200192.168.2.201虛擬機器上的kafkaserver

      >bin/kafka-server-start.sh config/server.properties &
           

      建立主題併發布訊息和訂閱訊息


  二、建立主題                                                                                                         

建立了主題名my-test-topic2個分割槽,一個副本

      >bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic my-test-topic
 

  主題資訊

     1、檢視哪些主題列表

      >bin/kafka-topics.sh --list --zookeeper localhost:2181

         

     2、具體檢視某個主題的描述資訊

       >bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic
           

第一行描述了my-test-topic主題的資訊、正如我們之前的建立的兩個分割槽、一個副本。接下去每一個描述對應的各個分割槽的情況,有多少個分割槽就有對應幾行。

       Leader:負責處理訊息的讀和寫,Leader是從所有節點中隨機選擇的.

       Replicas:分割槽對應的副本。

       Isr:副本所在的server 節點。

 三、釋出訊息和訂閱訊息                                                                                           

   1、釋出訊息

        >bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-test-topic
                

         1)檢視釋出者

       >./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic  my-test-topic
          my-test-topic:1:1
          my-test-topic:0:1

   2、訂閱訊息

      >bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic my-test-topic --from-beginning

                      

      1)檢視訂閱描述資訊

      >bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config group.id:test-consumer-group --topic my-test-topic --from-beginning

[2017-05-20 08:35:26,396] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)

Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/my-test-topic/0.

0.9.0.版本後對kafka.tools.ConsumerOffsetChecker命令就不在支援了。

           2)檢視消費組

            >bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

                 

           3)檢視具體的消費者資訊

       >bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group console-consumer-44096

         

       4)我們在消費時指定消費組,config/consumer.properties配置的group.id

         >bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config  config/consumer.properties  --topic my-test-topic --from-beginning
                       

五、刪除主題                                                                                                           

 1、刪除Broker

     >jps
     >Kill -9 程序ID

  2、刪除主題

    >bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-test-topic