1. 程式人生 > >(二)Kafka0.8.2官方文檔中文版系列-API

(二)Kafka0.8.2官方文檔中文版系列-API

black mat color cte 需要 self 兼容 lec 取數

2. API

我們正在為Kafka重寫JVM客戶端。在Kafka0.8.2中,包含一個新重寫的Java producer。下一個版本將包含一個等效的Java consumer。這些新客戶端旨在取代現有的Scala客戶端,但為了兼容性,它們將共存一段時間。這些客戶端可以在一個獨立的jar中使用,並且具有最小的依賴性,而舊的Scala客戶端仍然與服務器打包在一起。

2.1 Producer API

在kafka0.8.2版本中,我們鼓勵你使用新的java producer。這個客戶端經過生產環境的測試,相比之前的scala客戶端該客戶端更快、有更多的特性。你可以通過添加如下maven依賴使用它:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

在javadoc中可以查看如何使用producer。

對於那些對遺留Scala生產者api感興趣的人,可以在這裏找到相關信息。

2.2 High Level Consumer API

class Consumer {
  
/** * Create a ConsumerConnector 創建一個消費者連接器 * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper * connection string zookeeper.connect.
  * 參數解釋:基於一個最小的配置,你只需要指定消費者組,zookeeper的連接
*/ public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config); }
/** * V: type of the message 消息的類型 * K: type of the optional key assciated with the message 與消息相關的可選的配置 */ public interface kafka.javaapi.consumer.ConsumerConnector { /** * Create a list of message streams of type T for each topic. * 為每個topic創建一個T類型的消息流列表
  * *
@param topicCountMap a map of (topic, #streams) pair * @param decoder a decoder that converts from Message to T * @return a map of (topic, list of KafkaStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ public <K,V> Map<String, List<KafkaStream<K,V>>> createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder); /** * Create a list of message streams of type T for each topic, using the default decoder.
  * 為每個topic創建一個T類型的消息流列表,使用默認的解碼器
*/ public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap); /** * Create a list of message streams for topics matching a wildcard. * * @param topicFilter a TopicFilter that specifies which topics to * subscribe to (encapsulates a whitelist or a blacklist). * @param numStreams the number of message streams to return. * @param keyDecoder a decoder that decodes the message key * @param valueDecoder a decoder that decodes the message itself * @return a list of KafkaStream. Each stream supports an * iterator over its MessageAndMetadata elements. */ public <K,V> List<KafkaStream<K,V>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder); /** * Create a list of message streams for topics matching a wildcard, using the default decoder.
* 為與通配符匹配的消息流創建一個消息流列表,使用默認的解碼器
*/ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams); /** * Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
* 為與通配符匹配的消息流創建一個消息流列表,使用默認的解碼器
*/ public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter); /** * Commit the offsets of all topic/partitions connected by this connector.
* 提交通過該連接器關聯的所有的topic和分區的偏移量
*/ public void commitOffsets(); /** * Shut down the connector 關閉連接器 */ public void shutdown(); }

你可以參考這個示例去學習如何使用high level 消費者API。

2.3 Simple Consumer API

class kafka.javaapi.consumer.SimpleConsumer {
  /**
   *  Fetch a set of messages from a topic.
* 從一個topic拉取消息 * *
@param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* 請求需要指定主題的名稱、主題分區、起始偏移量、拉取數據的最大字節數 *
@return a set of fetched messages
* 拉取回來的消息集合
*/ public FetchResponse fetch(kafka.javaapi.FetchRequest request); /** * Fetch metadata for a sequence of topics.
* 獲取一系列主題的元數據 * *
@param request specifies the versionId, clientId, sequence of topics. 需要指定版本號、客戶端ID、主題 * @return metadata for each topic in the request. 每個請求主題的元數據 */ public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request); /** * Get a list of valid offsets (up to maxSize) before the given time.
* 在給定的時間之前,得到一個有效的偏移量列表(偏移量可以取到給定時間之前的最大值) * *
@param request a [[kafka.javaapi.OffsetRequest]] object. * @return a [[kafka.javaapi.OffsetResponse]] object. */ public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request); /** * Close the SimpleConsumer. 關閉SimpleConsumer */ public void close(); }

對大多數應用來說,the high level API是完全夠用的。一些應用想要使用一些high level API沒有暴露的特性(比如說,當重啟消費者時指定初始的offset,即偏移量)。他們可以使用我們的low level

SimpleConsumer API。但是這個邏輯會有點復雜,你可以參考這個例子。

2.4 Kafka Hadoop Consumer API

我們的一個基本用例就是,為數據聚合和加載數據到hadoop提供一個水平擴展的解決方案。為了支持這個用戶用例,我們提供了一個基於hadoop的消費者,它生成了許多map任務,以並行地從Kafka集群中拉取數據。這可以非常快速的將kafka的數據加載到hadoop中(我們只用了一些Kafka服務器就完全飽和了網絡,意思就是基於hadoop的consumer拉取速度很快)。

使用hadoop consumer的信息,可以在這裏找到。

(二)Kafka0.8.2官方文檔中文版系列-API