1. 程式人生 > >kafka生產者java客戶端

kafka生產者java客戶端

logger stat 多個 ssa cep sage ride 保存 速度

producer

  包含一個用於保存待發送消息的緩沖池,緩沖池中消息是還沒來得及傳輸到kafka集群的消息。
  位於底層的kafka I/O線程負責將緩沖池中的消息轉換成請求發送到集群。如果在結束produce時,沒有調用close()方法,那麽這些資源會發生泄露。
 

常用配置

bootstrap.servers

 用於初始化時建立鏈接到kafka集群,以host:port形式,多個以逗號分隔host1:port1,host2:port2; 

acks

生產者需要server端在接收到消息後,進行反饋確認的尺度,主要用於消息的可靠性傳輸;acks=0表示生產者不需要來自server的確認;
acks=1表示server端將消息保存後即可發送ack,而不必等到其他follower角色的都收到了該消息;acks=all(or acks=-1)意味著server端將等待所有的副本都被接收後才發送確認。

retries

生產者發送失敗後,重試的次數 


batch.size

當多條消息發送到同一個partition時,該值控制生產者批量發送消息的大小,批量發送可以減少生產者到服務端的請求數,有助於提高客戶端和服務端的性能。 


linger.ms

默認情況下緩沖區的消息會被立即發送到服務端,即使緩沖區的空間並沒有被用完。
可以將該值設置為大於0的值,這樣發送者將等待一段時間後,再向服務端發送請求,以實現每次請求可以盡可能多的發送批量消息。

batch.size

batch.size和linger.ms是兩種實現讓客戶端每次請求盡可能多的發送消息的機制,它們可以並存使用,並不沖突。 


buffer.memory

生產者緩沖區的大小,保存的是還未來得及發送到server端的消息,如果生產者的發送速度大於消息被提交到server端的速度,該緩沖區將被耗盡。 

key.serializer,value.serializer

說明了使用何種序列化方式將用戶提供的key和vaule值序列化成字節。

Producer

  

public class Producer {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);  
    private
KafkaProducer<String, String> kafkaProducer; private Random random = new Random(); private String topic; private int retry; public Producer() { this("my_init"); } public Producer(String topic) { this(topic,3); } public Producer(String topic,int retry) { this.topic = topic; this.retry = retry; if (null == kafkaProducer) { Properties props = new Properties(); InputStream inStream = null; try { inStream = this.getClass().getClassLoader().getResourceAsStream("kafka-producer.properties"); props.load(inStream); kafkaProducer = new KafkaProducer<String, String>(props); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e); } finally { if (null != inStream) { try { inStream.close(); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e); } } } } } /** * 通過kafkaProducer發送消息 * @param topic 消息接收主題 * @param partitionNum 哪一個分區 * @param retry 重試次數 * @param message 具體消息值 */ public RecordMetadata sendKafkaMessage(final String message) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", message); Future<RecordMetadata> meta = kafkaProducer.send(record, new Callback() {
//send方法是異步的,添加消息到緩存區等待發送,並立即返回,這使生產者通過批量發送消息來提高效率 public void onCompletion(RecordMetadata recordMetadata,Exception exception) { if (null != exception) { LOGGER.error("kafka發送消息失敗:" + exception.getMessage(),exception); retryKakfaMessage(message); } } }); RecordMetadata metadata = null; try { metadata = meta.get(); } catch (InterruptedException e) { } catch (ExecutionException e) {} return metadata; } /** * 當kafka消息發送失敗後,重試 */ private void retryKakfaMessage(final String retryMessage) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", retryMessage); for (int i = 1; i <= retry; i++) { try { kafkaProducer.send(record); return; } catch (Exception e) { LOGGER.error("kafka發送消息失敗:" + e.getMessage(), e); retryKakfaMessage(retryMessage); } } } /** * kafka實例銷毀 */ public void close() { if (null != kafkaProducer) { kafkaProducer.flush(); kafkaProducer.close(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetry() { return retry; } public void setRetry(int retry) { this.retry = retry; } }

TestProducer

  

public class TestProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);  
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for(int i=0;i<3;i++){
            executor.submit(new Runnable() {
                @Override
                public void run() {
                     String topic = "2017-11-6-test";
                     Producer p = new Producer(topic);
                     for(int n=1;n<=5;n++){
                         String str = "hello world => "+n;
                         RecordMetadata message = p.sendKafkaMessage(str);
                         LOGGER.info("發送信息: "+message.topic()+"---"+message.partition()+"---"+message.offset());
                     }
                     p.close();
                }
            });
        }
        System.out.println("this is main");
        executor.shutdown();//這個表示 線程執行完之後自動退出
        System.out.println("hello world");
    }
}

kafka生產者java客戶端