1. 程式人生 > >kafka 客戶端使用

kafka 客戶端使用

kafka客戶端釋出record(訊息)到kafka叢集。

新的生產者是執行緒安全的,線上程之間共享單個生產者例項,通常單例比多個例項要快。

一個簡單的例子,使用producer傳送一個有序的key/value(鍵值對),放到java的main方法裡就能直接執行,

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

生產者的緩衝空間池保留尚未傳送到伺服器的訊息,後臺I/O執行緒負責將這些訊息轉換成請求傳送到叢集。如果使用後不關閉生產者,則會洩露這些資源。

send()方法是非同步的,新增訊息到緩衝區等待發送,並立即返回。生產者將單個的訊息批量在一起傳送來提高效率。

ack是判別請求是否為完整的條件(就是是判斷是不是成功傳送了)。我們指定了“all”將會阻塞訊息,這種設定效能最低,但是是最可靠的。

retries,如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重複訊息的可能性。

producer(生產者)快取每個分割槽未傳送的訊息。快取的大小是通過 batch.size 配置指定的。值較大的話將會產生更大的批。並需要更多的記憶體(因為每個“活躍”的分割槽都有1個緩衝區)。

預設緩衝可立即傳送,即便緩衝空間還沒有滿,但是,如果你想減少請求的數量,可以設定linger.ms大於0。這將指示生產者傳送請求之前等待一段時間,希望更多的訊息填補到未滿的批中。這類似於TCP的演算法,例如上面的程式碼段,可能100條訊息在一個請求傳送,因為我們設定了linger(逗留)時間為1毫秒,然後,如果我們沒有填滿緩衝區,這個設定將增加1毫秒的延遲請求以等待更多的訊息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處於高負載的情況下,如果設定比0大,以少量的延遲代價換取更少的,更有效的請求。

buffer.memory 控制生產者可用的快取總量,如果訊息傳送速度比其傳輸到伺服器的快,將會耗盡這個快取空間。當快取空間耗盡,其他傳送呼叫將被阻塞,阻塞時間的閾值通過max.block.ms

設定,之後它將丟擲一個TimeoutException。

key.serializervalue.serializer示例,將使用者提供的key和value物件ProducerRecord轉換成位元組,你可以使用附帶的ByteArraySerializaerStringSerializer處理簡單的string或byte型別。

send()

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

非同步傳送一條訊息到topic,並呼叫callback(當傳送已確認)。

send是非同步的,並且一旦訊息被儲存在等待發送的訊息快取中,此方法就立即返回。這樣並行傳送多條訊息而不阻塞去等待每一條訊息的響應。

傳送的結果是一個RecordMetadata,它指定了訊息傳送的分割槽,分配的offset和訊息的時間戳。如果topic使用的是CreateTime,則使用使用者提供的時間戳或傳送的時間(如果使用者沒有指定指定訊息的時間戳)如果topic使用的是LogAppendTime,則追加訊息時,時間戳是broker的本地時間。

由於send呼叫是非同步的,它將為分配訊息的此訊息的RecordMetadata返回一個Future。如果future呼叫get(),則將阻塞,直到相關請求完成並返回該訊息的metadata,或丟擲傳送異常。

如果要模擬一個簡單的阻塞呼叫,你可以呼叫get()方法。

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

完全無阻塞的話,可以利用回撥引數提供的請求完成時將呼叫的回撥通知。

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

傳送到同一個分割槽的訊息回撥保證按一定的順序執行,也就是說,在下面的例子中 callback1 保證執行 callback2 之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

注意:callback一般在生產者的I/O執行緒中執行,所以是相當的快的,否則將延遲其他的執行緒的訊息傳送。如果你需要執行阻塞或計算昂貴(消耗)的回撥,建議在callback主體中使用自己的Executor來並行處理。

pecified by:

send in interface Producer<K,V>

Parameters:

record - 傳送的記錄(訊息)
callback - 使用者提供的callback,伺服器來呼叫這個callback來應答結果(null表示沒有callback)。

Throws:

InterruptException - 如果執行緒在阻塞中斷。
SerializationException - 如果key或value不是給定有效配置的serializers。
TimeoutException - 如果獲取元資料或訊息分配記憶體話費的時間超過max.block.ms。
KafkaException - Kafka有關的錯誤(不屬於公共API的異常)。




連結:https://www.orcho