Kafka 學習筆記(3)——kafka java API
阿新 • • 發佈:2018-12-12
1 新建maven 工程
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tzb.cn</groupId> <artifactId>tzbkafka</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> </dependency> </dependencies> </project>
2 生產者原始碼
KafkaProducerSimple.java
package demo;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import java.util.UUID;
public class KafkaConsumerSimple {
public static void main(String[] args) {
/*
* 1. 指定當前 kafka producer 生產的資料目的地
* */
String TOPIC = "orderMq";
/*
* 2. 讀取配置檔案
* */
Properties props = new Properties();
//key.serializer.class預設為serializer.class
props.put("serializer.class", "kafka.serializer.StringEncoder");
//kafka broker對應的主機
props.put("metadata.broker.list" , "node1:9092,node2:9092,node3:9092");
/*
* request.required.acks,設定傳送資料是否需要服務端的反饋,有三個值0,1,-1
* 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。
* 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些資料。
* 1,意味著在leader replica已經接收到資料後,producer會得到一個ack。
* 這個選項提供了更好的永續性,因為在server確認請求成功處理後,client才會返回。
* 如果剛寫到leader上,還沒來得及複製leader就掛了,那麼訊息才可能會丟失。
* -1,意味著在所有的ISR都接收到資料後,producer才得到一個ack。
* 這個選項提供了最好的永續性,只要還有一個replica存活,那麼資料就不會丟失
*/
props.put("request.required.acks", "1");
/*
* 可選配置,如果不配置,則使用預設的partitioner partitioner.class
* 預設值:kafka.producer.DefaultPartitioner
* 用來把訊息分到各個partition中,預設行為是對key進行hash。
*/
props.put("partitioner.class", "demo.MyLogPartitioner");
//props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
/*
* 3.通過配置檔案,建立生產者
* */
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
/*
* 4.通過 for 迴圈生產資料
* */
for (int messageNo = 1; messageNo < 100000; messageNo++) {
producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + " ", " appid -- " + UUID.randomUUID() + "consumertest"));
}
}
}
MyLogPartitioner.java
package demo;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.apache.log4j.Logger;
public class MyLogPartitioner implements Partitioner {
private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
public MyLogPartitioner(VerifiableProperties props) {
}
public int partition(Object obj, int numPartitions) {
//return Integer.parseInt(obj.toString())%numPartitions;
return 1;
}
}
3 消費者
package demo;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerSimple implements Runnable {
public String title;
public KafkaStream<byte[], byte[]> stream;
public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
this.title = title;
this.stream = stream;
}
public void run() {
System.out.println("開始執行" + title);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
/**
* 不停地從stream讀取新到來的訊息,在等待新的訊息時,hasNext()會阻塞
* 如果呼叫 `ConsumerConnector#shutdown`,那麼`hasNext`會返回false
* */
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> data = it.next();
String topic = data.topic();
int partition=data.partition();
long offset=data.offset();
String msg=new String(data.message());
System.out.println(String.format(
"Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",
title, topic, partition, offset, msg));
}
System.out.println(String.format("consumer:[%s] exiting....",title));
}
public static void main(String[] args){
Properties props=new Properties();
props.put("group.id","myconsumer");
props.put("zookeeper.connect","ndoe1:2181,node2:2181,node3:2181");
props.put("auto.offset.reset","largest");
props.put("auto.commit.interval.ms","1000");
props.put("partition.assigment.strategy","roundrobin");
ConsumerConfig config=new ConsumerConfig(props);
String topic="orderMq";
//只要ConsumerConnector 換在,consumer 會一直等待新訊息,不會退出
ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
topicCountMap.put(topic,4);
//Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是對應的流
Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
//取出 `kafkaTest` 對應的 streams
List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);
//建立一個容量為4的執行緒池
ExecutorService executor = Executors.newFixedThreadPool(3);
//建立20個consumer threads
for (int i = 0; i < streams.size(); i++)
executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));
}}