Kafka學習筆記 -- 寫入資料
摘要:
建立一個ProducerRecord物件,ProducerRecord物件包含 Topic 和 Value ,還可以指定 Key 或 Partition
在傳送ProducerRecord物件時,生產者先將 Key 和 Partition 序列化成 位元組陣列 ,...
- 建立一個ProducerRecord物件,ProducerRecord物件包含 Topic 和 Value ,還可以指定 Key 或 Partition
- 在傳送ProducerRecord物件時,生產者先將 Key 和 Partition 序列化成 位元組陣列 ,以便於在網路上傳輸
- 位元組陣列被傳給 分割槽器
- 如果之前在ProducerRecord物件裡指定了 Partition ,那麼分割槽器就不會做任何事情,直接返回指定的分割槽
- 如果沒有指定分割槽,那麼分割槽器會根據ProducerRecord物件的 Key 來 選擇一個Partition
- 選擇好分割槽後,生產者就知道該往哪個主題和分割槽傳送這條記錄
- 這條記錄會被新增到一個 記錄批次 裡, 一個批次內的所有訊息 都會被髮送到 相同的Topic和Partition 上
- 有一個單獨的執行緒負責把這些記錄批次傳送到相應的Broker
- 伺服器在收到這些訊息時會返回一個響應
- 如果訊息成功寫入Kafka,就會返回一個RecordMetaData物件,它包含了 Topic和Partition資訊 ,以及 記錄在分割槽裡的偏移量
- 如果寫入失敗,就會返回一個錯誤
- 生產者在收到錯誤之後會嘗試重新發送訊息,幾次之後如果還是失敗,就會返回錯誤資訊
建立生產者
必選屬性
bootstrap.servers
- Broker的地址清單, host:port
- 清單裡不需要包含所有的Broker地址, 生產者會從給定的Broker裡找到其它Broker的資訊
- 建議 最少兩個 ,一旦其中一個宕機,生產者仍然能夠連線到叢集上
key.serializer
- Broker希望接收到的訊息的 Key 和 Value 都是 位元組陣列
- 生產者介面允許使用 引數化型別 ,因此可以把 Java物件 作為Key和Value傳送給Broker
- key.serializer必須設定為一個實現了org.apache.kafka.common.serialization.Serializer介面的類
- 生產者會通過${key.serializer}把 Key物件 序列化為位元組陣列
- 預設提供
- ByteArraySerializer
- StringSerializer
- IntegerSerializer
- key.serializer是 必須設定 的!
value.serializer
- 與key.serializer類似,value.serializer指定的類會把 Value 序列化
- 如果 Key 和 Value 都是 字串 ,可以使用與key.serializer一樣的序列化器
- 如果 Key 是整數型別,而 Value 是字串,那麼需要使用不同的序列化器
樣例程式碼
Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("key.serializer", StringSerializer.class.getName()); kafkaProps.put("value.serializer", StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
發生訊息
傳送方式
傳送並忘記
- 生產者把訊息傳送給伺服器,但 並不關心是否正常到達
- Kafka是高可用的,而且生產者會 自動嘗試重發
- 會丟失一些訊息
同步傳送
- 使用 send() 方法傳送訊息,會返回一個 Future物件 ,呼叫 get() 方法進行 等待