1. 程式人生 > >Kafka-JavaAPI(Producer And Consumer)

Kafka-JavaAPI(Producer And Consumer)

tor 文檔 string類型 sleep obj utils 機制 part broker

Kafka--JAVA API(Producer和Consumer)

Kafka 版本2.11-0.9.0.0

producer

package com.yzy.spark.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Producer
 */
public class KafkaProducer extends Thread{
    private String topic;
    //--1
    private Producer<String, String> producer;
    public KafkaProducer(String topic){
        this.topic=topic;

        Properties properties = new Properties(); //--2
        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.require.acks","1");
//        properties.put("partitioner.class","com.yzy.spark.kafka.MyPartition");
        ProducerConfig config=new ProducerConfig(properties);
        producer=new Producer<>(config);
    }

    @Override
    public void run() {
        int messageNo=1;
        while (true){
            String message="message"+ messageNo;
            producer.send(new KeyedMessage<String,String>("test2",String.valueOf(messageNo),message)); //--4
            System.out.println("Sent:"+message);
            messageNo++;
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

1.定義Producer<K,V>對象,這裏要註意泛型類型,之後的KeyedMessage<K,V>的泛型類型和Producer<K,V>相同。

2.創建Producer<K,V>對象需要傳入一個ProducerConfig對象,而ProducerConfig對象需要由Properties對象構造,properties的屬性設置可以查看ProducerConfig源碼,註釋很詳細(個別屬性在ProducerConfig父類AsyncProducerConfig 和 SyncProducerConfigShared中)。

3.該屬性可以指定partitioner,如果不設置默認會設為kafka.producer.DefaultPartitioner。

4.看看KeyedMessage的源碼:

case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")
  
  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
  
  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
  
  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }
  
  def hasKey = key != null
}

參數有4個,topic必填,message是消息,通常只填這兩個參數即可發送消息。key和partKey是用於partition的參數,partKey的優先級高於key,但是partKey只對當前消息起作用,key和partKey只能是String類型。下面來看看partition策略和key。

partition

先在服務器端將topic test2的partitions設定為3

kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test2 

然後回到客戶端看看kafka.producer.DefaultPartitioner源碼


package kafka.producer


import kafka.utils._
import org.apache.kafka.common.utils.Utils

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
  private val random = new java.util.Random
  
  def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}

該類有一個方法 def partition(key: Any, numPartitions: Int),第一個參數為上文所說的key或partKey,第二個為partitions的數量,傳入的值就是在服務器設置的值(3),將key的hashCode對numPartitions取余得到結果(選擇對應編號的partition)

我們可以自己定義一個partition.class並配置到properties屬性中,這裏給一個簡單的例子:

package com.yzy.spark.kafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class MyPartition implements Partitioner {
    public MyPartition(VerifiableProperties properties){

    }
    @Override
    public int partition(Object key, int numPartitions) {
        System.out.println("numPartitions:"+numPartitions);
        return key.hashCode()%numPartitions;

    }
}

Consumer

package com.yzy.spark.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer extends Thread{
    private String topic;
    private String groupId;

    public KafkaConsumer(String topic,String groupId){
        this.topic=topic;
        this.groupId=groupId;
        
    }

    private ConsumerConnector createConnector(){
        Properties properties=new Properties();//--1
        properties.put("zookeeper.connect",KafkaProperties.ZK);
        properties.put("group.id",groupId);
        properties.put("auto.offset.reset", "largest");//--2
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        return Consumer.createJavaConsumerConnector(consumerConfig);
    }

    @Override
    public void run() {
        ConsumerConnector consumerConnector=this.createConnector();
        Map<String,Integer> topicCountMap=new HashMap<>();
        topicCountMap.put(topic,1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        while(iterator.hasNext()){
            String message=new String(iterator.next().message());
        }
    }
}

Consumer相關的東西比較多,涉及到group和partition機制,以官方文檔為準。

1.properties和producer一樣看源碼配置。

2.這個屬性和shell命令中的--from-beginning對應。可以填smallest(從頭讀取)和largest(默認值,讀取最新的元素,嚴格來說是最新的offset位置開始讀取)。註意:每一次一個新的consumer試圖去消費一個topic時,都是從所在group的largest offset位置讀取,即也可通過設置group.id來實現from-beginning,只要將每個consumer的group.id都設置為一個新值即可,例如properties.put("group.id", UUID.randomUUID().toString());

Kafka-JavaAPI(Producer And Consumer)