1. 程式人生 > >Kafka : Kafka入門教程和JAVA客戶端使用

Kafka : Kafka入門教程和JAVA客戶端使用

目錄

Kafka簡介

由Scala和Java編寫,Kafka是一種高吞吐量的分散式釋出訂閱訊息系統.

環境介紹

作業系統:centos6.5
kafka:1.0.1
zookeeper:3.4.6

術語介紹

  • Broker : Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker
  • Topic : 每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
  • Partition : Partition是物理上的概念,每個Topic包含一個或多個Partition.
  • Producer : 負責釋出訊息到Kafka broker
  • Consumer : 訊息消費者,向Kafka broker讀取訊息的客戶端。
  • Consumer Group : 每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。

消費模式

這裡寫圖片描述

為了照顧對MQ不是很瞭解的同學,先講一下MQ的原理.一般MQ都是在服務端儲存一個佇列.生產者把訊息丟到MQ server,消費者從MQ server消費.這樣一來解決了生產者和消費者的高耦合問題,同時也解決了生產速度和消費速度差異導致的消費者跟不上生產者的生產速度而導致的消費者壓力過大問題.

在kafka中的topic就是一系列佇列的總稱,稱為一個主題.當然ActiveMQ和RabbitMQ中都有這個概念.一類訊息都會丟到一個topic中去.

講完topic我們講一下partition(分割槽),這個東西是kafka獨有的東西,也是kafka實現橫向擴充套件和高併發的一個重要設計.我們試想一下,如果每個topic只有一個佇列,隨著業務增加topic裡訊息越來越多.多到一臺server裝不下了怎麼辦.為了解決這個問題,我們引入了partition這個概念.一個partition(分割槽)代表了一個物理上存在的佇列.topic只是一組partition(分割槽)的總稱,也就是說topic僅是邏輯上的概念.這樣一來當topic上的訊息越來越多.我們就可以將新增的partition(分割槽)放在其他server上.也就是說topic裡邊的partition(分割槽)可以分屬於不同的機器.實際生產中,也基本都是這樣玩的.

這裡說一個特殊情況,有時我們建立了一個topic沒有指定partition(分割槽)數量或者指定了partition(分割槽)數量為1,這時實際也是有一個預設的partition(分割槽)的,名字我忘記了.

從Producer(生產者)角度,一個訊息丟到topic中任務就完成了.至於具體丟到了topic中的哪個partition(分割槽),Producer(生產者)不需要關注.這裡kafka自動幫助我們做了負載均衡.當然如果我們指定某個partition(分割槽)也是可以的.這個大家官方文件和百度.

接下里我們講Consumer Group(消費組),Consumer Group(消費組)顧名思義就是一組Consumer(消費者)的總稱.那有了組的概念以後能起到什麼作用.如果只有一組內且組內只有一個Consumer,那這個就是傳統的點對點模式,如果有多組,每組內都有一個Consumer,那這個就是釋出-訂閱(pub-sub)模式.每組都會收到同樣的訊息.

最後講最難理解也是大家討論最多的地方,partition(分割槽)和Consumer(消費者)的關係.首先,一個Consumer(消費者)的一個執行緒在某個時刻只能接收一個partition(分割槽)的資料,一個partition(分割槽)某個時刻也只會把訊息發給一個Consumer(消費者).我們設計出來幾種場景:

場景一: topic-1 下有partition-1和partition-2
group-1 下有consumer-1和consumer-2和consumer-3
所有consumer只有一個執行緒,且都消費topic-1的訊息.
消費情況 : consumer-1只消費partition-1的資料
consumer-2只消費partition-2的資料
consumer-3不會消費到任何資料
原因 : 只能接受一個partition(分割槽)的資料

場景二: topic-1 下有partition-1和partition-2
group-1 下有consumer-1
consumer只有一個執行緒,且消費topic-1的訊息.
消費情況 : consumer-1先消費partition-1的資料
consumer-1消費完partition-1資料後開始消費partition-2的資料
原因 : 這裡是kafka檢測到當前consumer-1消費完partition-1處於空閒狀態,自動幫我做了負載.所以大家看到這裡在看一下上邊那句話的”某個時刻”
特例: consumer在消費訊息時必須指定topic,可以不指定partition,場景二的情況就是發生在不指定partition的情況下,如果consumer-1指定了partition-1,那麼consumer-1消費完partition-1後哪怕處於空閒狀態了也是不會消費partition-2的訊息的.

進而我們總結出了一條經驗,同組內的消費者(單執行緒消費)數量不應多於topic下的partition(分割槽)數量,不然就會出有消費者空閒的狀態,此時併發執行緒數=partition(分割槽)數量.反之消費者數量少於topic下的partition(分割槽)數量也是不理想的,原因是此時併發執行緒數=消費者數量,並不能完全發揮kafka併發效率.

最後我們看下上邊的圖,Consumer Group A的兩個機器分別開啟兩個執行緒消費P0 P1 P2 P3的訊息Consumer Group B的四臺機器單執行緒消費P0 P1 P2 P3的訊息就可以了.此時效率最高.

下載

叢集安裝配置

解壓 : cd /usr/local && tar -xzvf kafka_2.11-1.0.1.tgz

建立log目錄 : cd /usr/local/kafka_2.11-1.0.1 && mkdir kafkaLogs

配置:vi /usr/local/kafka_2.11-1.0.1/config/server.properties需改下邊五個地方

#broker的id,叢集中的每臺機器id唯一,其他兩臺分別1和2
broker.id=0  
#是Kafka繫結的interface,這裡需要寫本機內網ip地址,不然bind埠失敗
#其他兩臺分別是192.168.1.5和192.168.1.9
host.name=192.168.1.3 
#向zookeeper註冊的對外暴露的ip和port,118.212.149.51是192.168.1.3的外網ip地址
#如果不配置kafka部署在外網伺服器的話本地是訪問不到的.
advertised.listeners=PLAINTEXT://118.212.149.51:9092 
#zk叢集的ip和port,zk叢集教程:
zookeeper.connect=192.168.1.3:2181,192.168.1.5:2181,192.168.1.9:2181
#log目錄,剛剛上邊建好的.
log.dirs=/usr/local/kafka_2.11-1.0.1/kafkaLogs

啟動叢集(分別在三臺broker執行):進入bin目錄cd /usr/local/kafka_2.11-1.0.1/bin/執行啟動指令碼並指定配置檔案./kafka-server-start.sh -daemon ../config/server.properties

驗證叢集是否啟動成功:

[[email protected] ~]# cd /usr/local/zookeeper-3.4.6/bin/
[[email protected] bin]# ./zkCli.sh -server 127.0.0.1:2181
...
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /brokers/ids
[0, 1, 2] #這裡的012分別是三個broker的id

檢視某個broker資訊:注意endpoints資訊的ip:port,這個就是我們對外服務暴露的地址,我這裡是外網訪問,所以暴露的是外網ip和埠

[zk: 127.0.0.1:2181(CONNECTED) 1] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://118.212.149.51:9092"],"jmx_port":-1,"host":"118.212.149.51","timestamp":"1521010377533","port":9092,"version":4}
cZxid = 0x700000626
ctime = Wed Mar 14 14:52:57 CST 2018
mZxid = 0x700000626
mtime = Wed Mar 14 14:52:57 CST 2018
pZxid = 0x700000626
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3621e366ae20014
dataLength = 198
numChildren = 0

命令使用

建立topic :

#--replication-factor 建立的副本數,這個使用來備份的.副本數不能大於broker數
#--partitions 1 建立的分割槽數.根據實際情況建立
./kafka-topics.sh --create --zookeeper 192.168.1.3:2181 --replication-factor 1 --partitions 1 --topic milo

檢視topic :

./kafka-topics.sh --list --zookeeper 192.168.1.3:2181

檢視topic詳細資訊 :

./kafka-topics.sh --describe --zookeeper 192.168.1.3:2181

結果如下:
這裡寫圖片描述
第一行topic資訊摘要:分別是topic名字(Topic),partition數量(PartitionCount),副本數量(ReplicationFactor),配置(Config)
第二行~第四行分別列出了名為milo的topic的所有partition.依次為topic名字(Topic),partition號(Partition),此partition所在的borker(Leader),副本所在的broker(Replicas),Isr列表(Isr)
ps:同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,通俗理解就是替補隊員,不是每個broker都可以作為替補隊員.首先這個broker得存有副本,其次副本還得滿足條件.就像我們大學足球隊,有的人是替補,有的人連大名單都沒進去,原因是他不會踢球. ^ ^

生產訊息 :

./kafka-console-producer.sh --broker-list 118.212.149.51:9092 --topic test\
>hello world

消費訊息 :

./kafka-console-consumer.sh --zookeeper 118.212.149.51:2181 --topic milo --from-beginning
hello world

JAVA實戰

pom.xml

<dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>1.0.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.1</version>
    </dependency>
  </dependencies>

Producer.java

package cn.milo.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;

import java.util.Properties;

/******************************************************
 ****** @ClassName   : Producer.java                                            
 ****** @author      : milo ^ ^                     
 ****** @date        : 2018 03 14 11:34     
 ****** @version     : v1.0.x                      
 *******************************************************/
public class Producer {

    static Logger log = Logger.getLogger(Producer.class);

    private static final String TOPIC = "milo2";
    private static final String BROKER_LIST = "118.212.149.51:9092";
    private static KafkaProducer<String,String> producer = null;

    /*
    初始化生產者
     */
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    /*
    初始化配置
     */
    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) throws InterruptedException {
        //訊息實體
        ProducerRecord<String , String> record = null;
        for (int i = 0; i < 1000; i++) {
            record = new ProducerRecord<String, String>(TOPIC, "value"+(int)(10*(Math.random())));
            //傳送訊息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e){
                        log.info("send error" + e.getMessage());
                    }else {
                        System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));
                    }
                }
            });
        }
        producer.close();
    }
}

Consumer :

package cn.milo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;

import java.util.Properties;

/******************************************************
 ****** @ClassName   : Consumer.java                                            
 ****** @author      : milo ^ ^                     
 ****** @date        : 2018 03 14 15:50     
 ****** @version     : v1.0.x                      
 *******************************************************/
public class Consumer {

    static Logger log = Logger.getLogger(Producer.class);

    private static final String TOPIC = "milo2";
    private static final String BROKER_LIST = "118.212.149.51:9092";
    private static KafkaConsumer<String,String> consumer = null;

    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
    }

    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers",BROKER_LIST);
        properties.put("group.id","0");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }


    public static void main(String[] args) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                log.info(record);
            }
        }
    }
}

參考文獻