1. 程式人生 > >kafka的partition如何分佈到不同的broker上,consumerGroup組員和partition之間如何做負載均衡,kafka常用命令

kafka的partition如何分佈到不同的broker上,consumerGroup組員和partition之間如何做負載均衡,kafka常用命令

1、partition如何分佈到不同的broker上

下面給出kafka在實現分割槽分佈到各個broker上的演算法實現,可以通過建立topic,設定副本數驗證

public void kafkaProducter(){
        //partitions建立的分割槽,比如我建立了一個topic,
        // 設定的副本是1時,partitions = partition * 1;
        // 設定的副本為2時,partitions = partition * 2;
        List<String> partitions = new LinkedList<>();
        partitions.add("p0");
        partitions.add("p1");
        partitions.add("p2");
        partitions.add("p3");
        partitions.add("p0");//副本
        partitions.add("p1");//副本
        partitions.add("p2");//副本
        partitions.add("p3");//副本
        //borkers是kafka叢集
        List<String> brokers = new LinkedList<>();
        brokers.add("b1");
        brokers.add("b2");
        brokers.add("b3");
        for(int i = 0;i<partitions.size();i++){
            System.out.println("分割槽"+partitions.get(i)+"在:"+brokers.get(i%brokers.size())+"的broker上");
        }
    }

測試結果:


2、consumerGroup組員和partition之間如何做負載均衡

通過設定消費者的數量,驗證下面的demo

public void kafkaConsumer(){
        List<String> partitions = new LinkedList<>();
        partitions.add("p0");
        partitions.add("p1");
        partitions.add("p2");
        partitions.add("p3");
        List<String> consumers = new LinkedList<>();
        consumers.add("c1");
        consumers.add("c2");
        consumers.add("c3");
        consumers.add("c4");
        consumers.add("c5");
        consumers.add("c6");
        //向上取整,計算每個消費者對應幾個分割槽
        int m = (int) Math.ceil(partitions.size()*1.0/consumers.size());
        System.out.println("m:"+m);
        for (int i = 0;i<consumers.size();i++){
            System.out.println("消費者"+consumers.get(i)+",對應的分割槽:");
            for(int j=0;j<m;j++){
                //如果下標大於等於partitions的元素個數,break
                if(i*m+j >= partitions.size()){
                    break;
                }
                System.out.println(partitions.get(i*m+j));
            }
        }
    }

測試結果:


3、kafka常用命令

a、建立topic

bin/kafka-topics.sh --zookeeper ip:2181 --create --topic topic名字   --partitions 分割槽數量  --replication-factor 副本數量

b、刪除topic,執行命令之後不是馬上刪除

bin/kafka-topics.sh --zookeeper ip:2181 --delete --topic topic名字

c、檢視各個分割槽資料量

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --time -1 --topic topic名字

d、檢視topic列表

bin/kafka-topics.sh --list --zookeeper ip:3181

e、檢視topic表述

bin/kafka-topics.sh --zookeeper ip:3181 --topic topic名字 --describe

f、向對應topic傳送訊息

bin/kafka-console-producer.sh --broker-list ip:9092 --topic topic名字

g、消費訊息

bin/kafka-console-consumer.sh --zookeeper ip:2181 --topic topic名字 --from-beginning

歡迎大家來吐槽,內容有問題,會及時修改,謝謝!!!