1. 程式人生 > >kafka----kafka API(java版本)

kafka----kafka API(java版本)

spring mvc+my batis dubbo+zookeerper kafka restful redis分布式緩存

Apache Kafka包含新的Java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了兼容性,它們仍將存在一段時間。可以通過一些單獨的jar包調用這些客戶端,這些包的依賴性都比較小,同時老的Scala客戶端仍會存在。

一、Producer API

我們鼓勵所有新開發都使用新的java版本producer。這個客戶端是經過生產環境測試的,並且一般情況下會比先前的Scala客戶端要更快而且具有更多的特性。你可以通過添加對客戶端jar包的依賴來調用這個客戶端,如下所示,使用maven配置:

<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.10.0.0</version>
	</dependency>
 
可以通過javadoc文件查看如何使用producer。
 
二、Consumer API
在0.9.0發布版本中,增加了新的java版本的consumer,用來替代已有的high-level的基於zookeeper的consumer,以及low-level的consumer APIs。
這個客戶端認為是beta版本。為了保證用戶獲得平穩的升級,我們會繼續維護0.8版本的consumer客戶端,此版本客戶端會在0.9版本的kafka集群中
依然生效。下面的章節中,我們會介紹老的0.8版本的consumer APIs(包括high-level的ConusmerConnector以及low-level SimpleConsumer)以及
新的Java版本的consumer API。
 
1、Old High  Level Consumer  API
class   Consumer{
/**
*  Create a ConsumerConnector:創建consumer connector
*
*  @param config at the minimum, need to specify the groupid of the consumer and the zookeeper connection string zookeeper.connect.config參數作用:需要置頂consumer的groupid以及zookeeper連接字符串zookeeper.connect
*/
 
public static kafka.javaapi.consumer.ConsumerConnector  createJavaConsumerConnector(ConsumerConfig  config);
}
 
/**
*  V: type of the message: 消息類型
*  K: type of the optional key assciated with the message: 消息攜帶的可選關鍵字類型
*/
public interface kafka.javaapi.consumer.ConsumerConnector {
/**
*  Create a list of message streams of type T for each topic.:為每個topic創建T類型的消息流的列表
*
*  @param topicCountMap a map of (topic, #streams) pair   : topic與streams的鍵值對
*  @param decoder a decoder that converts from Message to T  : 轉換Message到T的解碼器
*  @return  a map of (topic, list of KafakStream) pairs.   : topic與KafkaStream列表的鍵值對
*           The number of items in the list is #streams . Each stream supports
*           an iterator over message/metadata pairs .:列表中項目的數量是#streams。每個stream都支持基於message/metadata 對的叠代器
*/
public <K,V> Map<String, List<KafkaStream<K,V> > >
createMessageStreams( Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
 
 
/** * Create a list of message streams of type T for each topic, using the default decoder.為每個topic創建T類型的消息列表。使用默認解碼器 */
 
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
   *  Create a list of message streams for topics matching a wildcard.為匹配wildcard的topics創建消息流的列表
   *
   *  @param topicFilter a TopicFilter that specifies which topics to
   *                    subscribe to (encapsulates a whitelist or a blacklist).指定將要訂閱的topics的TopicFilter(封裝了whitelist或者黑名單)
   *  @param numStreams the number of message streams to return.將要返回的流的數量
   *  @param keyDecoder a decoder that decodes the message key  可以解碼關鍵字key的解碼器
   *  @param valueDecoder a decoder that decodes the message itself  可以解碼消息本身的解碼器
   *  @return a list of KafkaStream. Each stream supports an
   *          iterator over its MessageAndMetadata elements.  返回KafkaStream的列表。每個流都支持基於MessagesAndMetadata 元素的叠代器。
   */
 public <K,V> List<KafkaStream<K,V>>
    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
 
/**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder.使用默認解碼器,為匹配wildcard的topics創建消息流列表
   */  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);


  /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.使用默認解碼器,為匹配wildcard的topics創建消息流列表
   */  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);


  /**
   *  Commit the offsets of all topic/partitions connected by this connector.通過connector提交所有topic/partitions的offsets
   */  public void commitOffsets();


  /**
   *  Shut down the connector: 關閉connector
   */  public void shutdown();
}
 
 

你可以根據這個例子學習怎樣使用high level consumer api。

2、Old Simple Consumer  API
class kafka.javaapi.consumer.SimpleConsumer {
  /**
   *  Fetch a set of messages from a topic.從topis抓取消息序列
   *
   *  @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.指定topic 名字,topic partition,開始的字節offset,抓取的最大字節數
   *  @return a set of fetched messages
   */  public FetchResponse fetch(kafka.javaapi.FetchRequest request);


  /**
   *  Fetch metadata for a sequence of topics.抓取一系列topics的metadata
   *
   *  @param request specifies the versionId, clientId, sequence of topics.指定versionId,clientId,topics
   *  @return metadata for each topic in the request.返回此要求中每個topic的元素據
   */  public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);


  /**
   *  Get a list of valid offsets (up to maxSize) before the given time.在給定的時間內返回正確偏移的列表
   *
   *  @param request a [[kafka.javaapi.OffsetRequest]] object. 
   *  @return a [[kafka.javaapi.OffsetResponse]] object.
   */  public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);


  /**
   * Close the SimpleConsumer.關閉
   */  public void close();
}
 
對大多數應用來說, high  level consumer  Api已經足夠了,一些應用要求的一些特征還沒有出現high level consumer接口(例如,
當重啟consumer時,設置初始offset)。他們可以使用low level SimpleConsumer  Api。邏輯可能會有些復雜,你可以根據這個例子學習一下。
 
3、New Consumer API
 
新consumer API統一了標準,原來存在於0.8版本的high-level以及low-level consumer APIs之間差異不存在了。你可以通過使用下面maven配置方式,
指明客戶端依賴的jar包,這樣就可以使用新的consumer API。
 
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>

Examples showing how to use the consumer are given in the javadocs.

 
三、Streams API
在0.10.0 release版本中,我們增加了新的客戶端調用庫Kafka Streams,用來支持流式處理應用。Kafka Streams庫認為是
alpha版本質量的,同時它的公共調用APIs在將來有可能會修改。你可以像下面maven配置模式一樣,指明Kafka Streams的
依賴關系來調用Kafka Streams。
 
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>

在Javadocs中展示了如何調用這個庫(註意這些類都是不穩定的,表明以後的版本中可能會修改)。

源碼地址獲取mingli

有興趣的朋友們可以前往球球哦~一起分享學習技術:2042849237


kafka----kafka API(java版本)