1. 程式人生 > >kafka 指定partition兩種方式&Consumer不消費訊息的幾個原因

kafka 指定partition兩種方式&Consumer不消費訊息的幾個原因

需求

1.隨機生成IP數字,將奇數、偶數資料分在不同的Partition
2.通過KafkaAPI實現一個消費組中多消費者,為了是驗證同組的不同消費者是否一一對應不同的Patition
3.打包部署在Linux中執行

解決辦法

方法1:
在producer傳送訊息時指定partition,ProducerRecord的構造方法可以有四個引數,分別是topic,int型別的partition值,key,value,我們直接指定傳入的第二個引數即可
當我們不指定第二個引數,使用三個引數的構造方式時,會根據傳入的key自動分割槽,傳入key為空時訊息不分割槽,會傳到同一個partition中

producer.send(new ProducerRecord<>(topic,partition,ip, msg));

方法2:
自定義Partitioner,重寫partition方法

@Override
    public int partition(String topic, Object key, byte[] arg2, Object value,
            byte[] arg4, Cluster arg5) {
        // TODO Auto-generated method stub
        /*
         * Cluster arg5打印出來是這個樣子
         * Cluster(nodes = [172.17.11.11:9092 (id: 0 rack: null), 172.17.11.13:9092 (id: 1 rack: null), 172.17.11.15:9092 (id: 2 rack: null)], partitions = [Partition(topic = TOPIC-20160504-1200, partition = 1, leader = 2, replicas = [0,1,2,], isr = [2,1,0,], Partition(topic = TOPIC-20160504-1200, partition = 2, leader = 0, replicas = [0,1,2,], isr = [0,2,1,], Partition(topic = TOPIC-20160504-1200, partition = 0, leader = 1, replicas = [0,1,2,], isr = [1,0,2,]])
        */
/* * byte[] arg2以位元組碼的格式儲存key * System.out.println(new String(arg2)); * System.out.println(key.toString());二者輸出相同,都是key * byte[] arg4和Object value同理 */ /* * 返回值指定的分割槽值 */ //從傳入的key中分割出用於分割槽的數值 int partition= Integer.parseInt(key.toString().split("\\."
)[3]);//分割 " . " 需要轉義" \\. " if(partition%2==0){ return 1; }else{ return 2; } }

Consumer不消費訊息的幾個原因

通過列印當前物件地址,來確定兩個分割槽是否被兩個不同的消費者消費,發現即使我使用了兩個消費者,實際消費訊息的還是一個消費者,只是會消費完一個分割槽再去消費第二個分割槽,第二個消費者並沒有起到作用,需要多執行緒
1.不使用多執行緒,被同一個消費者消費
clipboard.png
2.傳入key值為空,不分割槽,被同一個消費者消費
clipboard1.png
3.正確結果,使用多執行緒,key不為空
clipboard3.png