Kafka 入門教程之三 生產者配置以及傳送資訊的方式
阿新 • • 發佈:2019-01-09
這篇文章是關於Kafka 生產者配置以及訊息的傳送方式進行程式碼級別的案例分享
kafka 提供了3種傳送資訊的方式
- Fire-and-forget
這種方式是不管傳送成功與否,客戶端都會返回成功。儘管大多數的時候Kafka 在傳送失敗後,會自己重新自動再一次傳送訊息,但是也會存在丟失訊息的風險
- Synchronous send
這種方式是同步傳送的方式,會等待future 物件的返回來判斷是否傳送成功。
- Asynchronous send
非同步傳送基於實現了send() 方法的回撥函式
Send Meaaeg with Fire-and-forget
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Msg-Fire-Forget",
"Welcome to my home!!! ");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}finally {
producer.close();
}
Send Meaaeg with Synchronous
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Msg-Sync",
"Sync Message");
try {
RecordMetadata rec = producer.send(record).get();
System.out.println(rec.topic());
} catch (Exception e) {
e.printStackTrace();
}finally {
producer. close();
}
返回topic 主題內容:
DEBUG Metrics - Added sensor with name topic.TestMsg.records-per-batch
DEBUG Metrics - Added sensor with name topic.TestMsg.bytes
DEBUG Metrics - Added sensor with name topic.TestMsg.compression-rate
DEBUG Metrics - Added sensor with name topic.TestMsg.record-retries
DEBUG Metrics - Added sensor with name topic.TestMsg.record-errors
TestMsg
INFO KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
DEBUG Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
DEBUG Metrics - Removed sensor with name connections-closed:
DEBUG Metrics - Removed sensor with name connections-created:
DEBUG Metrics - Removed sensor with name successful-authentication:
DEBUG Metrics - Removed sensor with name failed-authentication:
DEBUG Metrics - Removed sensor with name bytes-sent-received:
Send Meaaeg with Asynchronous
回撥函式需要實現Kafka 的介面 org.apache.kafka.clients.producer.Callback
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Msg-Async",
"Sync Message");
try {
System.out.println("Staring Sending....");
producer.send(record,new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Got FeedBack....");
}});
System.out.println("Stop Sending....");
} catch (Exception e) {
e.printStackTrace();
}finally {
producer.close();
}
觀察日誌輸出順序
Staring Sending....
DEBUG NetworkClient - [Producer clientId=producer-1] Initialize connection to node tjtestrac1:9092 (id: -1 rack: null) for sending metadata request
DEBUG NetworkClient - [Producer clientId=producer-1] Initiating connection to node tjtestrac1:9092 (id: -1 rack: null) using address tjtestrac1/10.26.14.43
DEBUG Metrics - Added sensor with name node--1.bytes-sent
DEBUG Metrics - Added sensor with name node--1.bytes-received
DEBUG Metrics - Added sensor with name node--1.latency
Stop Sending....
INFO KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
DEBUG NetworkClient - [Producer clientId=producer-1] Initiating connection to node tjtestrac1:9092 (id: 0 rack: null) using address tjtestrac1/10.26.14.43
DEBUG Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
DEBUG Metrics - Added sensor with name node-0.bytes-sent
Got FeedBack....