1. 程式人生 > >Kafka 入門教程之三 生產者配置以及傳送資訊的方式

Kafka 入門教程之三 生產者配置以及傳送資訊的方式

這篇文章是關於Kafka 生產者配置以及訊息的傳送方式進行程式碼級別的案例分享

kafka 提供了3種傳送資訊的方式
  • Fire-and-forget
這種方式是不管傳送成功與否,客戶端都會返回成功。儘管大多數的時候Kafka 在傳送失敗後,會自己重新自動再一次傳送訊息,但是也會存在丟失訊息的風險
  • Synchronous send
這種方式是同步傳送的方式,會等待future 物件的返回來判斷是否傳送成功。
  • Asynchronous send
非同步傳送基於實現了send() 方法的回撥函式
Send Meaaeg with Fire-and-forget
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Msg-Fire-Forget",
				"Welcome to my home!!! ");
		try {
			producer.send(record);
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			producer.close();
			
		}
Send Meaaeg with Synchronous

ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Msg-Sync",
				"Sync Message");
		try {
			RecordMetadata rec = producer.send(record).get();
			System.out.println(rec.topic());
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			producer.
close(); }
返回topic 主題內容:
DEBUG Metrics - Added sensor with name topic.TestMsg.records-per-batch
DEBUG Metrics - Added sensor with name topic.TestMsg.bytes
DEBUG Metrics - Added sensor with name topic.TestMsg.compression-rate
DEBUG Metrics - Added sensor with name topic.TestMsg.record-retries
DEBUG Metrics - Added sensor with name topic.TestMsg.record-errors
TestMsg
INFO  KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
DEBUG Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
DEBUG Metrics - Removed sensor with name connections-closed:
DEBUG Metrics - Removed sensor with name connections-created:
DEBUG Metrics - Removed sensor with name successful-authentication:
DEBUG Metrics - Removed sensor with name failed-authentication:
DEBUG Metrics - Removed sensor with name bytes-sent-received:
Send Meaaeg with Asynchronous
回撥函式需要實現Kafka 的介面 org.apache.kafka.clients.producer.Callback
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "Msg-Async",
				"Sync Message");
		try {
			System.out.println("Staring Sending....");
			producer.send(record,new Callback() {
				@Override
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					System.out.println("Got FeedBack....");
				}});
			System.out.println("Stop Sending....");
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			producer.close();
			
		} 
觀察日誌輸出順序
Staring Sending....
DEBUG NetworkClient - [Producer clientId=producer-1] Initialize connection to node tjtestrac1:9092 (id: -1 rack: null) for sending metadata request
DEBUG NetworkClient - [Producer clientId=producer-1] Initiating connection to node tjtestrac1:9092 (id: -1 rack: null) using address tjtestrac1/10.26.14.43
DEBUG Metrics - Added sensor with name node--1.bytes-sent
DEBUG Metrics - Added sensor with name node--1.bytes-received
DEBUG Metrics - Added sensor with name node--1.latency
Stop Sending....
INFO  KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
DEBUG NetworkClient - [Producer clientId=producer-1] Initiating connection to node tjtestrac1:9092 (id: 0 rack: null) using address tjtestrac1/10.26.14.43
DEBUG Sender - [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
DEBUG Metrics - Added sensor with name node-0.bytes-sent
Got FeedBack....