1. 程式人生 > >Kafka 學習筆記(3)——kafka java API

Kafka 學習筆記(3)——kafka java API

1 新建maven 工程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tzb.cn</groupId>
    <artifactId>tzbkafka</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
        </dependency>
    </dependencies>

</project>

2 生產者原始碼

KafkaProducerSimple.java

package demo;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;

public class KafkaConsumerSimple {

    public static void main(String[] args) {

        /*
         * 1. 指定當前 kafka producer 生產的資料目的地
         * */
String TOPIC = "orderMq"; /* * 2. 讀取配置檔案 * */ Properties props = new Properties(); //key.serializer.class預設為serializer.class props.put("serializer.class", "kafka.serializer.StringEncoder"); //kafka broker對應的主機 props.put("metadata.broker.list"
, "node1:9092,node2:9092,node3:9092"); /* * request.required.acks,設定傳送資料是否需要服務端的反饋,有三個值0,1,-1 * 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。 * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些資料。 * 1,意味著在leader replica已經接收到資料後,producer會得到一個ack。 * 這個選項提供了更好的永續性,因為在server確認請求成功處理後,client才會返回。 * 如果剛寫到leader上,還沒來得及複製leader就掛了,那麼訊息才可能會丟失。 * -1,意味著在所有的ISR都接收到資料後,producer才得到一個ack。 * 這個選項提供了最好的永續性,只要還有一個replica存活,那麼資料就不會丟失 */ props.put("request.required.acks", "1"); /* * 可選配置,如果不配置,則使用預設的partitioner partitioner.class * 預設值:kafka.producer.DefaultPartitioner * 用來把訊息分到各個partition中,預設行為是對key進行hash。 */ props.put("partitioner.class", "demo.MyLogPartitioner"); //props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); /* * 3.通過配置檔案,建立生產者 * */ Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props)); /* * 4.通過 for 迴圈生產資料 * */ for (int messageNo = 1; messageNo < 100000; messageNo++) { producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + " ", " appid -- " + UUID.randomUUID() + "consumertest")); } } }

MyLogPartitioner.java

package demo;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.apache.log4j.Logger;


public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    public int partition(Object obj, int numPartitions) {
        //return Integer.parseInt(obj.toString())%numPartitions;
        return 1;
    }

}

3 消費者

package demo;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaConsumerSimple implements Runnable {

    public String title;
    public KafkaStream<byte[], byte[]> stream;

    public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
        this.title = title;
        this.stream = stream;
    }


    public void run() {
        System.out.println("開始執行" + title);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        /**
         * 不停地從stream讀取新到來的訊息,在等待新的訊息時,hasNext()會阻塞
         * 如果呼叫 `ConsumerConnector#shutdown`,那麼`hasNext`會返回false
         * */
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> data = it.next();
            String topic = data.topic();
            int partition=data.partition();
            long offset=data.offset();
            String msg=new String(data.message());
            System.out.println(String.format(
                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                    title, topic, partition, offset, msg));
        }

        System.out.println(String.format("consumer:[%s] exiting....",title));

    }


    public static void main(String[] args){
        Properties props=new Properties();
        props.put("group.id","myconsumer");
        props.put("zookeeper.connect","ndoe1:2181,node2:2181,node3:2181");
        props.put("auto.offset.reset","largest");
        props.put("auto.commit.interval.ms","1000");
        props.put("partition.assigment.strategy","roundrobin");
        ConsumerConfig config=new ConsumerConfig(props);

        String topic="orderMq";

        //只要ConsumerConnector 換在,consumer 會一直等待新訊息,不會退出
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);

        Map<String,Integer> topicCountMap=new HashMap<String, Integer>();
        topicCountMap.put(topic,4);

        //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是對應的流
        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
        //取出 `kafkaTest` 對應的 streams
        List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);
        //建立一個容量為4的執行緒池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        //建立20個consumer threads
        for (int i = 0; i < streams.size(); i++)
            executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));
    }}