1. 程式人生 > >Kafka學習筆記(5)----Kafka使用Producer傳送訊息

Kafka學習筆記(5)----Kafka使用Producer傳送訊息

1. Kafka的Producer

  不論將kafka作為什麼樣的用途,都少不了的向Broker傳送資料或接受資料,Producer就是用於向Kafka傳送資料。如下:

  

2. 新增依賴

  pom.xml檔案如下:

 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</
artifactId> <version>2.1.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <
version>2.1.0</version> </dependency>

3. 傳送訊息

  3.1 建立生產者

  建立生產者的時候,我們需要為生產者設定一些屬性,其中有三個必選屬性如下:

  1. bootstrap.servers: 該屬性指定broker 的地址清單,地址的格式為host:po 忱。清單裡不需要包含所有的broker 地址,生產者會給定的broker 裡查詢到其他broker 的資訊。不過建議至少要提供兩個broker 的資訊, 一且其中一個若機,生產者仍然能夠連線到叢集上。

  2. key.serializer: broker 希望接收到的訊息的鍵和值都是位元組陣列。生產者介面允許使用引數化型別,因此可以把Java 物件作為鍵和值傳送給broker 。這樣的程式碼具有良好的可讀性,不過生產者需要知道如何把這些Java 物件轉換成位元組陣列。key. serializer必須被設定為一

實現了org.apache.kafka.common.serialization.StringSerializer介面的類,生產者會使用這個類把鍵物件序列化成位元組陣列。Kafka 客戶端預設提供了ByteArraySerializer(這個只做很少的事情)、StringSerializer和IntegeSerializer,因此,如果你只使用常見的幾種Java 物件型別,那麼就沒必要實現自己的序列化器。要注意,  key.serializer是必須設定的,就算你打算只發送值內容。

  3. value.serializer: 與key.serializer一樣,value.serializer指定的類會將值序列化。如果鍵和值都是字串,可以使用與key.serializer一樣的序列化器。如果鍵是整數型別而值是字串,那麼需要使用不同的序列化器。

  設定屬性程式碼如下:

  Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value允許使用引數化型別
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

  3.2 傳送訊息的三種方式

  1. 併發並忘記,這是普通的訊息傳送方式,我們把訊息傳送給伺服器,但井不關心它是否正常到達。大多數情況下,訊息會正常到達,因為Kafka 是高可用的,而且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些訊息。

  實現如下:

package com.wangx.kafka.client;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value允許使用引數化型別
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

        kafkaProducer.send(record);

    }
}

  此時在Kafka中開啟內建的消費者消費訊息,結果如下,命令如下:

  kafka-console-consumer.sh --bootstrap-server 47.105.145.123:9092 --topic testTopic --from-beginning

  然後,啟動生產者傳送訊息,結果如下:

  這裡啟動了四次消費者,所以有四條訊息被消費。

  3.3 同步傳送訊息

  我們使用send () 方怯傳送訊息, 它會返回Future物件,呼叫get () 方法進行等待,就可以知道悄息是否傳送成功。

  實現方式如下:

package com.wangx.kafka.client;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value允許使用引數化型別
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);
        //建立訊息物件,第一個為引數topic,第二個引數為key,第三個引數為value
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

        //同步傳送方式,get方法返回結果
        RecordMetadata metadata = (RecordMetadata) kafkaProducer.send(record).get();
        System.out.println("broker返回訊息傳送資訊" + metadata);

    }
}

  客戶端消費者仍能收到訊息,且生產者也能收到返回結果,返回結果如下:

  

  3.4 非同步傳送訊息

  假設訊息在應用程式和Kafka 叢集之間一個來回需要lOm s 。如果在傳送完每個訊息後都等待迴應,那麼傳送100 個訊息需要l秒。但如果只發送訊息而不等待響應,那麼傳送100 個訊息所需要的時間會少很多。大多數時候,我們並不需要等待響應一一儘管Kafka會把目標主題、分割槽資訊和悄息的偏移量傳送回來,但對於傳送端的應用程式來說不是必需的。不過在遇到訊息傳送失敗時,我們需要丟擲異常、記錄錯誤日誌,或者把訊息寫入“錯誤訊息”檔案以便日後分析。

  實現如下:

package com.wangx.kafka.client;


import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) {
        Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value允許使用引數化型別
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);
        //建立訊息物件,第一個為引數topic,第二個引數為key,第三個引數為value
        final ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

        //非同步傳送訊息。異常時列印異常資訊或傳送結果
        kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    System.out.println(e.getMessage());
                } else {
                    System.out.println("接收到返回結果:" + recordMetadata);
                }
            }
        });
        //非同步傳送訊息時必須要flush,否則傳送不成功,不會執行回撥函式
        kafkaProducer.flush();
    }
}

  監聽到的返回資訊如下:

  3.5 生產者的配置

  生產者還有很多可以配置的引數,如下:

  1. acks:指定了必須要有多少個分割槽副本收到訊息,生產者才會認為訊息寫入是成功的。這個引數對訊息丟失的可能性有重要影響。該引數有如下選項。

  如果acks=0 , 生產者在成功寫入訊息之前不會等待任何來自伺服器的響應。也就是說,如果當中出現了問題, 導致伺服器沒有收到訊息,那麼生產者就無從得知,訊息也就丟失了。不過,因為生產者不需要等待伺服器的響應,所以它可以以網路能夠支援的最大速度傳送訊息,從而達到很高的吞吐量。

  如果acks=1 ,只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器的成功響應。如果訊息無法到達首領節點(比如領導節點奔潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免資料丟失,生產者會重發訊息。不過,如果一個沒有收到訊息的節點成為新首領,訊息還是會丟失。這個時候的吞吐量取決於使用的是同步傳送還是非同步傳送。如果讓傳送客戶端等待伺服器的響應(通過呼叫Future物件的ge t ()方法),顯然會增加延遲(在網路上傳輸一個來回的延遲)。如果客戶端使用回撥,延遲問題就可以得到緩解,不過吞吐量還是會受傳送中訊息數量的限制(比如,生產者在收到伺服器響應之前可以傳送多少個訊息)。

  如果acks=all ,只有當所有參與複製的節點全部收到訊息時,生產者才會收到一個來自伺服器的成功響應。這種模式是最安全的,它可以保證不止一個伺服器收到訊息,就算有伺服器發生崩潰,整個叢集仍然可以執行。不過,它的延遲比acks=1時更高,因為我們要等待不只一個伺服器節點接收訊息。

  2. buffer.memory: 用來設定生產者記憶體緩衝區的大小,生產者用它緩衝要傳送到伺服器的訊息。如果應用程式傳送訊息的速度超過傳送到伺服器的速度,會導致生產者空間不足。這個時候,send ()方法呼叫要麼被阻塞,要麼丟擲異常,取決於如何設定block.on.buffer 引數(在0. 9.0.0 版本里被替換成了l'la x .block.l'ls ,表示在丟擲異常之前可以阻塞一段時間)。

  3. compression.type: 預設情況下,訊息傳送時不會被壓縮。該引數可以設定為snappy 、gzip 或lz4 ,它指定了訊息被髮到broker 之前使用哪一種壓縮演算法進行壓縮。ssnappy壓縮演算法由Google發明,它佔用較少的CPU ,卻能提供較好的效能和相當可觀的 壓縮比,如果比較關注效能和網路頻寬,可以使用這種演算法。gzip壓縮演算法一般會佔用較多的CPU ,但會提供更高的壓縮比,所以如果網路頻寬比較有限,可以使用這種演算法。使用壓縮可以降低網路傳輸開銷和儲存開銷,而這往往是向Kafka 傳送訊息的瓶頸所在。

  4. retries: 生產者從伺服器收到的錯誤有可能是臨時性的錯誤,在這種情況下, retries 引數的值決定了生產者可以重發訊息次數,如果達到這個次數,生產者會放棄重試並返回錯誤

  5. batch.size: 當有多個訊息需要被髮送到同一個分割槽時,生產者會把它們放在同一個批次裡,該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算。

  6. linger.ms: 該引數指定了生產者在傳送批次之前等待更多訊息加入批次的時間。

  7. client.id: 該引數可以是任意的字串,伺服器會用它來識別訊息的來源,還可以用在日誌和配額指標裡

  8. max.in.flight.requests.per.connection: 該引數指定了生產者在收到伺服器晌應之前可以傳送多少個訊息

  9. timeout.ms 、request.timeout.ms 和metadata.fetch.timeout.ms:request.timeout.ms 指定了生產者在傳送資料時等待伺服器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元資料(比如目標分割槽的首領是誰)時等待伺服器返回響應的時間。如果等待響應超時,那麼生產者要麼重試傳送資料,要麼返回一個錯誤(丟擲異常或執行回撥)。timeout.ms指定了broker 等待同步副本返回訊息確認的時間,與asks 的配置相匹配一一如果在指定時間內沒有收到同步副本的確認,那麼broker 就會返回一個錯誤。

  10. max.block.ms:該引數指定了在呼叫send () 方法或使用partitionsFor()方法獲取元資料時生產者的阻塞時間。

  11. max.request.size:該引數用於控制生產者傳送的請求大小,可以指能傳送的單個訊息的最大值,也可以指單個請求裡面所有訊息總的大小。

  12. receive.buffer.bytes 和send.buffer.bytes: 這兩個引數分別指定了TCP socket 接收和傳送資料包的緩衝區大小,如果它們被設為-1,就使用作業系統的預設值。