1. 程式人生 > >Kafka JavaApi中消費者與生產者的配置

Kafka JavaApi中消費者與生產者的配置

檔案目錄如下:
在這裡插入圖片描述

1.ConsumerDemo配置

package com.course.test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

    public static void main(String[] args) {

        Properties pros = new Properties();
        pros.put("bootstrap.servers","localhost:9092");
        pros.put("group.id","test");  // 用來表示consumer程序所在組的一個字串,如果設定同樣的group_id,表示這些程序都是屬於同一個consumer——group
        pros.put("enable.auto.commit","true"); // 如果設定為true,consumer所接收到的訊息的offset將會自動同步到zookeeper
        pros.put("auto.commit.interval.ms","1000"); // consumer向zookeeper提交offset的頻率,單位是秒
        pros.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        pros.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pros);
        consumer.subscribe(Arrays.asList("my_test"));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String>record:records){
                System.out.printf("offset = %d, key = %s , value = %s%n",record.offset(),record.key(),record.value());
            }
        }
    }
}

2.ProducerDemo配置

package com.course.test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {
        Properties prop = new Properties();

        prop.put("bootstrap.servers","localhost:9092");
        prop.put("acks","all");  // 生產者需要server接收到資料之後,要發出一個確認接收的訊號
                                    // 0 producer不需要等待任何確認的訊息
                                    // 1 意味著至少要等待leader已經成功將資料寫入本地log,並不意味著所有follower已經寫入
                                    // all 意味著leader需要等待所有備份都成功寫入到日誌中

        prop.put("retries",0); // 重試次數

        // 比如有兩條訊息, 1 和 2 。1先來,但是如果1傳送失敗了,重試次數為1.2就會接著傳送資料,然後1再發一次,這樣會改變訊息傳送的順序

        prop.put("buffer.memory",33554432); // 快取大小
        prop.put("batch.size",1000); // producer試圖批量處理訊息記錄。目的是減少請求次數,改善客戶端和服務端之間的效能。
        // 這個配置是控制批量處理訊息的位元組數。如果設定為0,則禁用批處理。如果設定過大,會佔用記憶體空間.

        prop.put("linger.ms",1);
        prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(prop);
        for (int i = 0 ; i < 100 ; i++){
            producer.send(new ProducerRecord<String, String>("my_test", Integer.toString(i+1),Integer.toString(i)));
        }

        producer.close();
    }
}

然後我們可以先執行ProducerDemo,再執行ConsumerDemo,可以看到生產者傳送的資訊。
在這裡插入圖片描述