1. 程式人生 > >Kafka——Kafka生產者(向Kafka寫入資料)

Kafka——Kafka生產者(向Kafka寫入資料)

在這一章,我們將從Kafra 生產者的設計和元件講起,學習如何使用Kafka 生產者。我們將情示如何建立KafkaProducer 和ProducerRecords 物件、如何將記錄傳送給Kafka ,以及如何處理從Kafka 返回的錯誤,然後介紹用幹控制生產者行為的重要配置選項,最後深入探討如何使用不同的分割槽方棒和序列化器,以及如何自定義序列化器和分割槽器。

生產者概覽

一個應用程式在很多情況下需要往Kafka 寫入訊息: 記錄使用者的活動(用於審計和分析)、記錄度量指標、儲存日誌訊息、記錄智慧家電的資訊、與其他應用程式進行非同步通訊、緩衝即將寫入到資料庫的資料,等等。
多樣的使用場景意味著多樣的需求:是否每個訊息都很重要?是否允許丟失一小部分訊息?偶爾出現重複訊息是否可以接受?是否有嚴格的延遲和吞吐量要求?

在之前提到的信用卡事務處理系統裡,訊息丟失或訊息重複是不允許的,可以接受的延遲最大為500ms ,對吞吐量要求較高我們希望每秒鐘可以處理一百萬個訊息。
儲存網站的點選資訊是另一種使用場景。在這個場景裡,允許丟失少量的訊息或出現少量的訊息重複,延遲可以高一些,只要不影響使用者體驗就行。換句話說,只要使用者點選連結後可以馬上載入頁面,那麼我們並不介意訊息要在幾秒鐘之後才能到達Kafka 伺服器。吞吐量則取決於網站使用者使用網站的頻度。
不同的使用場景對生產者API 的使用和配置會有直接的影響。

儘管生產者API 使用起來很簡單, 但訊息的傳送過程還是有點複雜的。下圖展示了向Kafka 傳送訊息的主要步驟。
在這裡插入圖片描述


我們從建立一個ProducerRecord 物件開始, Producer Record 物件需要包含目標主題和要傳送的內容。我們還可以指定鍵或分割槽
在傳送ProducerRecord 物件時,生產者要先把鍵和值物件序列化成位元組陣列,這樣它們才能夠在網路上傳輸
接下來,資料被傳給分割槽器。如果之前在ProducerRecord 物件裡指定了分割槽,那麼分割槽器就不會再做任何事情,直接把指定的分割槽返回。如果沒有指定分割槽,那麼分割槽器會根據ProducerRecord 物件的鍵來選擇一個分割槽。選好分割槽以後,生產者就知道該往哪個主題和
分割槽傳送這條記錄了。緊接著,這條記錄被新增到一個記錄批次裡
,這個批次裡的所有訊息會被髮送到相同的主題和分割槽上。有一個獨立的執行緒負責把這些記錄批次傳送到相應的broker 上。
伺服器在收到這些訊息時會返回一個響應。如果訊息成功寫入 Kafka ,就返回一個RecordMetaData 物件,它包含了主題和分割槽資訊,以及記錄在分割槽裡的偏移量。如果寫入失敗, 則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送訊息,幾次之後如果還
是失敗,就返回錯誤資訊

建立Kafka生產者

要往Kafka 寫入訊息,首先要建立一個生產者物件,井設定一些屬性。Kafka 生產者有3個必選的屬性。

  • bootstrap.servers
    該屬性指定broker 的地址清單,地址的格式為host:port。清單裡不需要包含所有broker,生產者會從給定的broker裡找到其他的broker,但是,建議至少兩個,其中一個宕機,使得生產者依然能夠連線到叢集上。
  • key.serializer
    broker 希望接收到的訊息的鍵和值都是位元組陣列。生產者介面允許使用引數化型別,因此可以把Java 物件作為鍵和值傳送給broker。

這樣的程式碼具有良好的可讀性,不過生產者需要知道如何把這些Java 物件轉換成位元組陣列。key. serializer必須被設定為一個實現了org.apache.kafka.common.serialization.Serializer介面的類,生產者會使用這個類把鍵物件序列化成位元組陣列。
Kafka 客戶端預設提供了ByteArraySerializer(這個只做很少的事情)、StringSerializer和IntegerSerializer,因此,如果你只使用常見的幾種Java 物件型別,那麼就沒必要實現自己的序列化器。

  • value.serializer
    與前一個差不多。

下面的程式碼片段慎示瞭如何建立一個新的生產者,這裡只指定了必要的屬性,其他使用預設設定。

	private KafkaProducer producer;
    private Properties kafkaProps=new Properties();
    
    kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
    kafkaProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
    producer=new KafkaProducer<String ,String>(kafkaProps);

這個介面很簡單,通過配置生產者的不同屬性就可以很大程度地控制值它的行為。另外也可以通過Properties進行設定哦。

例項化生產者物件後,接下來就可以開始傳送訊息了。傳送訊息主要有以下3 種方式。

  • 傳送並忘記( fire- and-forget )
    我們把訊息傳送給伺服器,但井不關心它是否正常到達。大多數情況下,訊息會正常到達,因為Kafka 是高可用的,而且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些訊息。
  • 同步傳送
    我們使用send()方怯傳送訊息, 它會返回一個Future物件,呼叫get() 方法進行等待,就可以知道訊息是否傳送成功。
  • 非同步傳送
    我們呼叫send()方怯,並指定一個回撥函式, 伺服器在返回響應時呼叫該函式。

本章的所有例子都使用單執行緒,但其實生產者是可以使用多執行緒來傳送訊息的。剛開始的時候可以使用單個消費者和單個執行緒。如果需要更高的吞吐量,可以在生產者數量不變的前提下增加執行緒數量。如果這樣做還不夠, 可以增加生產者數量。

傳送訊息到kafka

最簡單的訊息傳送如下所示

package com.promusician.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaMessageProducer {
    private final KafkaProducer<String,String> producer;
    private Properties kafkaProps=new Properties();
    public KafkaMessageProducer(){
        kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
        kafkaProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producer=new KafkaProducer<>(kafkaProps);
    }

    public void send() throws Exception{
        ProducerRecord<String,String> record= new ProducerRecord<String, String>
                ("CustomerCountry","Precision Prodcts",
                        "France");

        producer.send(record);
    }
}

生產者的send()方法將ProducerRecord物件作為引數,所以我們要先建立一個ProducerRecord物件。上面三個引數分別是主題topic,鍵,值。send方法可以在生產者架構圖裡看出,訊息被放入緩衝區,然後以單獨的執行緒傳送到伺服器端。send會返回一個包含RecordMetadata的Future物件,不過因為我們忽略返回值,所以無法得知訊息是否成功。如果不要求傳送結果,可以使用這種方式。
我們可以忽略傳遞訊息時的錯誤或者伺服器的錯誤,但是在傳送訊息的時候,生產者還是有可能發生其他異常,比如序列化失敗SerializaitonException、緩衝區已滿BufferExhaustedException或者TimeoutException、又或者是執行緒中斷InterruptException。

同步傳送訊息

 public void send()throws Exception{
        ProducerRecord<String,String> record= new ProducerRecord<String, String>
                ("CustomerCountry","Precision Prodcts",
                        "France");

        producer.send(record).get();
    }

在這裡producer.send()方法先返回一個Future物件,然後呼叫Future物件的get()方法等待Kafka的相應,如果伺服器返回錯誤,get方法會丟擲異常,否則會得到一個RecordMetadata物件,可以用它來獲取訊息的偏移量。
當然訊息傳送也可能發生錯誤,比如broker反悔了一個不允許重發訊息的異常或者已經超過了重發次數的異常,我們這裡簡單使用throws Exception丟擲,沒做其他處理。

KafkaProducer一般會發生兩類錯誤。其中一類是可重試錯誤,這類錯誤可以通過重發訊息來解決。比如對於連線錯誤,可以通過再次建立連線來解決,“無主( no leader )” 錯誤則可以通過重新為分割槽選舉首領來解決。KafkaProducer可以被配置成自動重試,如果在多次重試後仍無能解決問題,應用程式會收到一個重試異常。另一類錯誤無出通過重試解決,比如“訊息太大”異常。對於這類錯誤, KafkaProducer不會進行任何重試,直接丟擲異常。

非同步傳送訊息
為了在非同步傳送訊息的同時能夠對異常情況進行處理,生產者提供了回撥支援。下面是使用回撥的一個例子。

 private class MessageProducerCallBack implements Callback{
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e!=null){
                e.printStackTrace();
            }
        }
    }

我們實現一個import org.apache.kafka.clients.producer.Callback;的介面。如果有錯誤,那就列印。

生產者的配置

生產者還有很多可配置的引數,在Kafka 文件裡都有說明,它們大部分都有合理的預設值, 所以沒有必要去修改它們。(除了上述介紹的bootstrap.server以及序列化器等)

  • acks
    該引數指定了必須要有多少個分割槽副本收到訊息,生產者才會認為訊息寫入是成功的。
    如果acks=0,那麼生產者在成功寫入訊息之前不會等待任何來自伺服器的響應。也就是說,無法得知訊息是否丟失。不過,因為生產者不需要等待伺服器的響應,所以它可以以網路能夠支援的最大速度傳送訊息,從而達到很高的吞吐量。
    如果acks=1,那麼只要叢集的首領節點收到訊息,生產者就會收到一個來自伺服器的成功響應。如果訊息無法到達首領節點(比如首領節點崩憤,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免資料丟失,生產者會重發訊息。
    如果acks=all,只有當所有參與複製的節點全部收到訊息時,生產者才會收到一個來自伺服器的成功響應。這種模式是最安全的,它可以保證不止一個伺服器收到訊息,就算有伺服器發生崩潰,整個叢集仍然可以執行。
  • buffer.memory
    該引數用來設定生產者記憶體緩衝區的大小,生產者用它緩衝要傳送到伺服器的訊息。生產者空間不足,這個時候,send ()方法呼叫要麼被阻塞,要麼丟擲異常,取決於block.on.buffer.full引數(0.9.0.0版本里被替換成為max.block.ms)。
  • compression.type
    預設情況下,訊息傳送時不會被壓縮。該引數可以設定為snappy、gzip 或lz4,它指定了訊息被髮送給broker 之前使用哪一種壓縮算也進行壓縮。
  • retries
    生產者從伺服器收到的錯誤有可能是臨時性的錯誤(比如分割槽找不到首領)。在這種情況下,retries引數的值決定了生產者可以重發訊息的次數,到達指定次數後,生產者會放棄並返回錯誤。
  • batch.size
    當有多個訊息需要被髮送到同一個分割槽時,生產者會把它們放在同一個批次裡。該引數指定了一個批次可以使用的記憶體大小。
  • linger.ms
    該引數指定了生產者在傳送批次之前等待更多訊息加入批次的時間。KafkaProducer會在批次填滿或linger.ms達到上限時把批次傳送出去。
  • client.id
    伺服器用來識別訊息的來源。
  • 其他略。

序列化器

自定義序列化器
如果傳送到Kafka 的物件不是簡單的字串或整型,那麼可以使用序列化框架來建立訊息記錄,如Avro、Thrift或者Protobuf,或者使用自定義的序列化器(但我們不建議使用自定義的序列器)。

使用Avro進行序列化等以後有需要再瞭解

分割槽

之前我們的例子使用了ProducerRecord的topic、key、value。其中key有兩種用途,它可以作為訊息的附加資訊,也可以用來決定訊息該被寫到主題的哪個區域。
key可以被設為null,只要如下設定即可:

 ProducerRecord<String,String> record= new ProducerRecord<String, String>("CustomerCountry", "France");

key為Null,並且採用預設的分割槽器。分割槽器使用輪詢演算法將訊息分部到各個分割槽上。如果key不為null,並且使用了預設的分割槽器,那麼Kafka會對key進行雜湊,然後根據雜湊值對映到特定分割槽。

上面是採用的預設分割槽器,當然也可以自定義分割槽器。假設你是一個B2B 供應商,你有一個大客戶,它是手持裝置Banana 的製造商。Banana 佔據了你整體業務10 % 的份額。如果使用預設的雜湊分割槽算怯, Banana 的賬號記錄將和其他賬號記錄一起被分配給相同的分割槽,導致這個分割槽比其他分割槽要大一些。伺服器可能因此出現儲存空間不足、處理緩慢等問題。我們需要給Banana 分配單獨的分割槽,然後使用雜湊分割槽算住處理其他賬號。
在這裡插入圖片描述