kafka生產者java客戶端
阿新 • • 發佈:2018-01-01
logger stat 多個 ssa cep sage ride 保存 速度
producer
包含一個用於保存待發送消息的緩沖池,緩沖池中消息是還沒來得及傳輸到kafka集群的消息。
位於底層的kafka I/O線程負責將緩沖池中的消息轉換成請求發送到集群。如果在結束produce時,沒有調用close()方法,那麽這些資源會發生泄露。
常用配置
bootstrap.servers
用於初始化時建立鏈接到kafka集群,以host:port形式,多個以逗號分隔host1:port1,host2:port2;
acks
生產者需要server端在接收到消息後,進行反饋確認的尺度,主要用於消息的可靠性傳輸;acks=0表示生產者不需要來自server的確認;
acks=1表示server端將消息保存後即可發送ack,而不必等到其他follower角色的都收到了該消息;acks=all(or acks=-1)意味著server端將等待所有的副本都被接收後才發送確認。
retries
生產者發送失敗後,重試的次數
batch.size
當多條消息發送到同一個partition時,該值控制生產者批量發送消息的大小,批量發送可以減少生產者到服務端的請求數,有助於提高客戶端和服務端的性能。
linger.ms
默認情況下緩沖區的消息會被立即發送到服務端,即使緩沖區的空間並沒有被用完。
可以將該值設置為大於0的值,這樣發送者將等待一段時間後,再向服務端發送請求,以實現每次請求可以盡可能多的發送批量消息。
batch.size
batch.size和linger.ms是兩種實現讓客戶端每次請求盡可能多的發送消息的機制,它們可以並存使用,並不沖突。
buffer.memory
生產者緩沖區的大小,保存的是還未來得及發送到server端的消息,如果生產者的發送速度大於消息被提交到server端的速度,該緩沖區將被耗盡。
key.serializer,value.serializer
說明了使用何種序列化方式將用戶提供的key和vaule值序列化成字節。
Producer
public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); privateKafkaProducer<String, String> kafkaProducer; private Random random = new Random(); private String topic; private int retry; public Producer() { this("my_init"); } public Producer(String topic) { this(topic,3); } public Producer(String topic,int retry) { this.topic = topic; this.retry = retry; if (null == kafkaProducer) { Properties props = new Properties(); InputStream inStream = null; try { inStream = this.getClass().getClassLoader().getResourceAsStream("kafka-producer.properties"); props.load(inStream); kafkaProducer = new KafkaProducer<String, String>(props); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e); } finally { if (null != inStream) { try { inStream.close(); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e); } } } } } /** * 通過kafkaProducer發送消息 * @param topic 消息接收主題 * @param partitionNum 哪一個分區 * @param retry 重試次數 * @param message 具體消息值 */ public RecordMetadata sendKafkaMessage(final String message) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", message); Future<RecordMetadata> meta = kafkaProducer.send(record, new Callback() {
//send方法是異步的,添加消息到緩存區等待發送,並立即返回,這使生產者通過批量發送消息來提高效率 public void onCompletion(RecordMetadata recordMetadata,Exception exception) { if (null != exception) { LOGGER.error("kafka發送消息失敗:" + exception.getMessage(),exception); retryKakfaMessage(message); } } }); RecordMetadata metadata = null; try { metadata = meta.get(); } catch (InterruptedException e) { } catch (ExecutionException e) {} return metadata; } /** * 當kafka消息發送失敗後,重試 */ private void retryKakfaMessage(final String retryMessage) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", retryMessage); for (int i = 1; i <= retry; i++) { try { kafkaProducer.send(record); return; } catch (Exception e) { LOGGER.error("kafka發送消息失敗:" + e.getMessage(), e); retryKakfaMessage(retryMessage); } } } /** * kafka實例銷毀 */ public void close() { if (null != kafkaProducer) { kafkaProducer.flush(); kafkaProducer.close(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetry() { return retry; } public void setRetry(int retry) { this.retry = retry; } }
TestProducer
public class TestProducer { private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class); public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); for(int i=0;i<3;i++){ executor.submit(new Runnable() { @Override public void run() { String topic = "2017-11-6-test"; Producer p = new Producer(topic); for(int n=1;n<=5;n++){ String str = "hello world => "+n; RecordMetadata message = p.sendKafkaMessage(str); LOGGER.info("發送信息: "+message.topic()+"---"+message.partition()+"---"+message.offset()); } p.close(); } }); } System.out.println("this is main"); executor.shutdown();//這個表示 線程執行完之後自動退出 System.out.println("hello world"); } }
kafka生產者java客戶端