1. 程式人生 > >kafka系列五、kafka常用java API

kafka系列五、kafka常用java API

引入maven包

<dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.10.0.0</version>
</dependency>

一、同步傳送訊息

package com.example.demo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SynProducer { private static Properties getProps(){ Properties props
= new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 1000); props.put("buffer.memory", 33554432); props.put("client.id", "producer-syn-1"); props.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(getProps()); for(int i=0; i< 1000; i++){ ProducerRecord<String, String> record = new ProducerRecord<>("test-1", "topic_"+i,"test-"+i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("傳送成功!"); System.out.println("topic:"+recordMetadata.topic()); System.out.println("partition:"+recordMetadata.partition()); System.out.println("offset:"+recordMetadata.offset()); } catch (InterruptedException|ExecutionException e) { System.out.println("傳送失敗!"); e.printStackTrace(); } } } }