1. 程式人生 > >Kafka 詳解(三)------Producer生產者

Kafka 詳解(三)------Producer生產者

spl ror 接下來 gif compile inter str lz4 arrays

  在第一篇博客我們了解到一個kafka系統,通常是生產者Producer 將消息發送到 Broker,然後消費者 Consumer 去 Broker 獲取,那麽本篇博客我們來介紹什麽是生產者Producer。

1、生產者概覽

  我們知道一個系統在運行過程中會有很多消息產生,比如前面說的對於一個購物網站,通常會記錄用戶的活動,網站的運行度量指標以及一些日誌消息等等,那麽產生這些消息的組件我們都可以稱為生產者。

  而對於生產者產生的消息重要程度又有不同,是否都很重要不允許丟失,是否允許丟失一部分?以及是否有嚴格的延遲和吞吐量要求?

  對於這些場景在 Kafka 中會有不同的配置,以及不同的 API 使用。

2、生產者發送消息步驟

  下圖是生產者向 Kafka 發送消息的主要步驟:

  技術分享圖片

  ①、首先要構造一個 ProducerRecord 對象,該對象可以聲明主題Topic、分區Partition、鍵 Key以及值 Value,主題和值是必須要聲明的,分區和鍵可以不用指定。

  ②、調用send() 方法進行消息發送。

  ③、因為消息要到網絡上進行傳輸,所以必須進行序列化,序列化器的作用就是把消息的 key 和 value對象序列化成字節數組。

  ④、接下來數據傳到分區器,如果之間的 ProducerRecord 對象指定了分區,那麽分區器將不再做任何事,直接把指定的分區返回;如果沒有,那麽分區器會根據 Key 來選擇一個分區,選擇好分區之後,生產者就知道該往哪個主題和分區發送記錄了。

  ⑤、接著這條記錄會被添加到一個記錄批次裏面,這個批次裏所有的消息會被發送到相同的主題和分區。會有一個獨立的線程來把這些記錄批次發送到相應的 Broker 上。

  ③、Broker成功接收到消息,表示發送成功,返回消息的元數據(包括主題和分區信息以及記錄在分區裏的偏移量)。發送失敗,可以選擇重試或者直接拋出異常。

3、Java Producer API

  首先在POM 文件中導入 kafka client。

<dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>2.0.0</version>
</dependency>

  實例代碼:

 1 package com.ys.utils;
 2 
 3 import org.apache.kafka.clients.producer.*;
 4 import java.util.Properties;
 5 
 6 /**
 7  * Create by YSOcean
 8  */
 9 public class KafkaProducerUtils {
10 
11     public static void main(String[] args) {
12         Properties kafkaProperties = new Properties();
13         //配置broker地址信息
14         kafkaProperties.put("bootstrap.servers", "192.168.146.200:9092,192.168.146.201:9092,192.168.146.202:9092");
15         //配置 key 的序列化器
16         kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
17         //配置 value 的序列化器
18         kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
19         
20         //通過上面的配置文件生成 Producer 對象
21         Producer producer = new KafkaProducer(kafkaProperties);
22         //生成 ProducerRecord 對象,並制定 Topic,key 以及 value
23         ProducerRecord<String,String> record =
24                 new ProducerRecord<String, String>("testTopic","key1","hello Producer");
25         //發送消息
26         producer.send(record);
27     }
28 }

  通過運行上述代碼,我們向名為 testTopic 的主題中發送了一條鍵為 key1,值為 hello Producer 的消息。

  技術分享圖片

4、屬性配置

  在上面的實例中,我們配置了如下三個屬性:

  ①、bootstrap.servers:該屬性指定 brokers 的地址清單,格式為 host:port。清單裏不需要包含所有的 broker 地址,生產者會從給定的 broker 裏查找到其它 broker 的信息。——建議至少提供兩個 broker 的信息,因為一旦其中一個宕機,生產者仍然能夠連接到集群上。

  ②、key.serializer:將 key 轉換為字節數組的配置,必須設定為一個實現了 org.apache.kafka.common.serialization.Serializer 接口的類,生產者會用這個類把鍵對象序列化為字節數組。——kafka 默認提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。當然也可以自定義序列化器。

  ③、value.serializer:和 key.serializer 一樣,用於 value 的序列化。

  以上三個屬性是必須要配置的,下面還有一些別的屬性可以不用配置,默認。

  ④、acks:此配置指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的,這個參數保障了消息發送的可靠性。默認值為 1。

    一、acks=0。生產者不會等待服務器的反饋,該消息會被立刻添加到 socket buffer 中並認為已經發送完成。也就是說,如果發送過程中發生了問題,導致服務器沒有接收到消息,那麽生產者也無法知道。在這種情況下,服務器是否收到請求是沒法保證的,並且參數retries也不會生效(因為客戶端無法獲得失敗信息)。每個記錄返回的 offset 總是被設置為-1。好處就是由於生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。

    二、acks=1。只要集群首領收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新首領還沒有被選舉出來),生產者會收到一個錯誤的響應,為了避免丟失消息,生產者會重發消息(根據配置的retires參數確定重發次數)。不過如果一個沒有收到消息的節點成為首領,消息還是會丟失,這個時候的吞吐量取決於使用的是同步發送還是異步發送。

    三、acks=all。只有當集群中參與復制的所有節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,但是延遲最高。

  ⑤、buffer.memory:該參數用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。默認值為33554432 字節。如果應用程序發送消息的速度超過發送到服務器的速度,那麽會導致生產者內存不足。這個時候,send() 方法會被阻塞,如果阻塞的時間超過了max.block.ms (在kafka0.9版本之前為block.on.buffer.full 參數)配置的時長,則會拋出一個異常。

  ⑥、compression.type:該參數用於配置生產者生成數據時可以壓縮的類型,默認值為 none(不壓縮)。還可以指定snappy、gzip或lz4等類型,snappy 壓縮算法占用較少的 CPU,gzip 壓縮算法占用較多的 CPU,但是壓縮比最高,如果網絡帶寬比較有限,可以使用該算法,使用壓縮可以降低網絡傳輸開銷和存儲開銷,這往往是 kafka 發送消息的瓶頸所在。

  ⑦、retires:該參數用於配置當生產者發送消息到服務器失敗,服務器返回錯誤響應時,生產者可以重發消息的次數,如果達到了這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者會在每次重試之間等待100ms,可以通過 retry.backoff.on 參數來改變這個時間間隔。

  還有一些屬性配置,可以參考官網:http://kafka.apachecn.org/documentation.html#producerconfigs

5、序列化器

  前面我們介紹過,消息要到網絡上進行傳輸,必須進行序列化,而序列化器的作用就是如此。

  ①、默認序列化器

  Kafka 提供了默認的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),還有整型(IntegerSerializer)和字節數組(BytesSerializer)序列化器,這些序列化器都實現了接口(org.apache.kafka.common.serialization.Serializer)基本上能夠滿足大部分場景的需求。

  下面是Kafka 實現的字符串序列化器 StringSerializer:

技術分享圖片
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.kafka.common.serialization;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    public StringSerializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null) {
            encodingValue = configs.get("serializer.encoding");
        }

        if (encodingValue instanceof String) {
            this.encoding = (String)encodingValue;
        }

    }

    public byte[] serialize(String topic, String data) {
        try {
            return data == null ? null : data.getBytes(this.encoding);
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
        }
    }

    public void close() {
    }
}
View Code

  其中接口 serialization:

技術分享圖片
 1 //
 2 // Source code recreated from a .class file by IntelliJ IDEA
 3 // (powered by Fernflower decompiler)
 4 //
 5 
 6 package org.apache.kafka.common.serialization;
 7 
 8 import java.io.Closeable;
 9 import java.util.Map;
10 
11 public interface Serializer<T> extends Closeable {
12     void configure(Map<String, ?> var1, boolean var2);
13 
14     byte[] serialize(String var1, T var2);
15 
16     void close();
17 }
View Code

  ②、自定義序列化器

  如果Kafka提供的幾個默認序列化器不能滿足要求,即發送到 Kafka 的消息不是簡單的字符串或整型,那麽我們可以自定義序列化器。

  比如對於如下的實體類 Person:

技術分享圖片
 1 package com.ys.utils;
 2 
 3 /**
 4  * Create by YSOcean
 5  */
 6 public class Person {
 7     private String name;
 8     private int age;
 9 
10     public String getName() {
11         return name;
12     }
13 
14     public void setName(String name) {
15         this.name = name;
16     }
17 
18     public int getAge() {
19         return age;
20     }
21 
22     public void setAge(int age) {
23         this.age = age;
24     }
25 }
View Code

  我們自定義一個 PersonSerializer:

技術分享圖片
 1 package com.ys.utils;
 2 
 3 import org.apache.kafka.common.serialization.Serializer;
 4 
 5 import java.io.UnsupportedEncodingException;
 6 import java.nio.ByteBuffer;
 7 import java.util.Map;
 8 
 9 /**
10  * Create by YSOcean
11  */
12 public class PersonSerializer implements Serializer<Person> {
13 
14     @Override
15     public void configure(Map map, boolean b) {
16         //不做任何配置
17     }
18 
19     @Override
20     /**
21      * Person 對象被序列化成:
22      *  表示 age 的4 字節整數
23      *  表示 name 長度的 4 字節整數(如果為空,則長度為0)
24      *  表示 name 的 N 個字節
25      */
26     public byte[] serialize(String topic, Person data) {
27         if(data == null){
28             return null;
29         }
30         byte[] name;
31         int stringSize;
32         try {
33             if(data.getName() != null){
34                 name = data.getName().getBytes("UTF-8");
35                 stringSize = name.length;
36             }else{
37                 name = new byte[0];
38                 stringSize = 0;
39             }
40             ByteBuffer buffer = ByteBuffer.allocate(4+4+stringSize);
41             buffer.putInt(data.getAge());
42             buffer.putInt(stringSize);
43             buffer.put(name);
44             return buffer.array();
45         } catch (UnsupportedEncodingException e) {
46             e.printStackTrace();
47         }
48         return new byte[0];
49     }
50 
51     @Override
52     public void close() {
53         //不需要關閉任何東西
54     }
55 }
View Code

  上面例子序列化將Person類的 age 屬性序列化為 4 個字節,後期如果該類發生更改,變為長整型 8 個字節,那麽可能會存在新舊消息兼容性問題。

  因此通常不建議自定義序列化器,可以使用下面介紹的已有的序列化框架。

  ③、序列化框架

  上面我們知道自定義序列化器可能會存在新舊消息兼容性問題,需要我們手動去維護,那麽為了省去此麻煩,我們可以使用一些已有的序列化框架。比如 JSON、Avro、Thrift 或者 Protobuf。

6、發送消息 send()

  ①、普通發送——發送就忘記

        //1、通過上面的配置文件生成 Producer 對象
        Producer producer = new KafkaProducer(kafkaProperties);

        //2、生成 ProducerRecord 對象,並制定 Topic,key 以及 value
        //創建名為testTopic的隊列,鍵為testkey,值為testValue的ProducerRecord對象
        ProducerRecord<String,String> record =
                new ProducerRecord<>("testTopic","testkey","testValue");
        //3、發送消息
        producer.send(record);

  通過配置文件構造一個生產者對象 producer,然後指定主題名稱,鍵值對,構造一個 ProducerRecord 對象,最後使用生產者Producer 的 send() 方法發送 ProducerRecord 對象,send() 方法會返回一個包含 RecordMetadata 的 Future 對象,不過通常我們會忽略返回值。

  和上面的名字一樣——發送就忘記,生產者只管發送,並不管發送的結果是成功或失敗。通常如果我們不關心發送結果,那麽就可以使用此種方式。

  ②、同步發送

//1、通過上面的配置文件生成 Producer 對象
Producer producer = new KafkaProducer(kafkaProperties);

//2、生成 ProducerRecord 對象,並制定 Topic,key 以及 value
//創建名為testTopic的隊列,鍵為testkey,值為testValue的ProducerRecord對象
ProducerRecord<String,String> record =
        new ProducerRecord<>("testTopic","testkey","testValue");
//3、同步發送消息
try {
    //通過send()發送完消息後返回一個Future對象,然後調用Future對象的get()方法等待kafka響應
    //如果kafka正常響應,返回一個RecordMetadata對象,該對象存儲消息的偏移量
    //如果kafka發生錯誤,無法正常響應,就會拋出異常,我們便可以進行異常處理
    producer.send(record).get();
} catch (Exception e) {
    //4、異常處理
    e.printStackTrace();
}

  和上面普通發送消息一樣,只不過這裏我們調用了 Future 對象的 get() 方法來等待 kafka 服務器的響應,程序運行到這裏會產生阻塞,直到獲取kafka集群的響應。而這個響應有兩種情況:

  1、正常響應:返回一個 RecordMetadata 對象,通過該對象我們能夠獲取消息的偏移量、分區等信息。

  2、異常響應:基本上來說會發生兩種異常,

    一類是可重試異常,該錯誤可以通過重發消息來解決。比如連接錯誤,可以通過再次連接後繼續發送上一條未發送的消息;再比如集群沒有首領(no leader),因為我們知道集群首領宕機之後,會有一個時間來進行首領的選舉,如果這時候發送消息,肯定是無法發送的。

    二類是無法重試異常,比如消息太大異常,對於這類異常,KafkaProducer 不會進行任何重試,直接拋出異常。

  同步發送消息適合需要保證每條消息的發送結果,優點是能夠精確的知道什麽消息發送成功,什麽消息發送失敗,而對於失敗的消息我們也可以采取措施進行重新發送。缺點則是增加了每條消息發送的時間,當發送消息頻率很高時,此種方式便不適合了。

  ③、異步發送

  有同步發送,基本上就會有異步發送了。同步發送每發送一條消息都得等待kafka服務器的響應,之後才能發送下一條消息,那麽我們不是在錯誤產生時馬上處理,而是記錄異常日誌,然後馬上發送下一條消息,而這個異常再通過回調函數去處理,這就是異步發送。

  1、首先我們要實現一個繼承 org.apache.kafka.clients.producer.Callback 接口,然後實現其唯一的 onCompletion 方法。

技術分享圖片
package com.ys.utils;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * Create by YSOcean
 */
public class KafkaCallback implements Callback{
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            //異常處理
            e.printStackTrace();
        }
    }
}
View Code

  2、發送消息時,傳入這個回調類。

//異步發送消息
producer.send(record,new KafkaCallback());

Kafka 詳解(三)------Producer生產者