1. 程式人生 > >kafka文件(3)----0.8.2-kafka API(java版本)

kafka文件(3)----0.8.2-kafka API(java版本)

    Apache Kafka包含新的java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了相容性,它們仍將存在一段時間。可以通過一些單獨的jar包呼叫這些客戶端,這些包的依賴性都比較小,同時老的Scala客戶端仍會存在。

一、Producer   API

  我們鼓勵所有新開發都使用新的java版本producer。這個客戶端是經過生產環境測試的,並且一般情況下會比先前的Scala客戶端要更快而且具有更多的特性。你可以通過新增對客戶端jar包的依賴來呼叫這個客戶端,如下所示,使用maven配置:

        <dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.10.0.0</version>
	</dependency>
可以通過javadoc檔案檢視如何使用producer。
二、Consumer API
在0.9.0釋出版本中,增加了新的java版本的consumer,用來替代已有的high-level的基於zookeeper的consumer,以及low-level的consumer APIs。
這個客戶端認為是beta版本。為了保證使用者獲得平穩的升級,我們會繼續維護0.8版本的consumer客戶端,此版本客戶端會在0.9版本的kafka叢集中
依然生效。下面的章節中,我們會介紹老的0.8版本的consumer APIs(包括high-level的ConusmerConnector以及low-level SimpleConsumer)以及
新的Java版本的consumer API。
1、Old High  Level Consumer  API
class   Consumer{
  /**
   *  Create a ConsumerConnector:建立consumer connector
   *
   *  @param config at the minimum, need to specify the groupid of the consumer and the zookeeper connection string zookeeper.connect.config引數作用:需要置頂consumer的groupid以及zookeeper連線字串zookeeper.connect
   */
    public static kafka.javaapi.consumer.ConsumerConnector  createJavaConsumerConnector(ConsumerConfig  config);
  }
  /**
   *  V: type of the message: 訊息型別
   *  K: type of the optional key assciated with the message: 訊息攜帶的可選關鍵字型別
   */
   public interface kafka.javaapi.consumer.ConsumerConnector {
     /**
      *  Create a list of message streams of type T for each topic.:為每個topic建立T型別的訊息流的列表
      * 
      *  @param topicCountMap a map of (topic, #streams) pair   : topic與streams的鍵值對
      *  @param decoder a decoder that converts from Message to T  : 轉換Message到T的解碼器
      *  @return  a map of (topic, list of KafakStream) pairs.   : topic與KafkaStream列表的鍵值對
      *           The number of items in the list is #streams . Each stream supports
      *           an iterator over message/metadata pairs .:列表中專案的數量是#streams。每個stream都支援基於message/metadata 對的迭代器
      */
      public <K,V> Map<String, List<KafkaStream<K,V> > >
        createMessageStreams( Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
     *  Create a list of message streams of type T for each topic, using the default decoder.為每個topic建立T型別的訊息列表。使用預設解碼器
     */
     public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);

    /**
   *  Create a list of message streams for topics matching a wildcard.為匹配wildcard的topics建立訊息流的列表
   *
   *  @param topicFilter a TopicFilter that specifies which topics to
   *                    subscribe to (encapsulates a whitelist or a blacklist).指定將要訂閱的topics的TopicFilter(封裝了whitelist或者黑名單)
   *  @param numStreams the number of message streams to return.將要返回的流的數量
   *  @param keyDecoder a decoder that decodes the message key  可以解碼關鍵字key的解碼器
   *  @param valueDecoder a decoder that decodes the message itself  可以解碼訊息本身的解碼器
   *  @return a list of KafkaStream. Each stream supports an
   *          iterator over its MessageAndMetadata elements.  返回KafkaStream的列表。每個流都支援基於MessagesAndMetadata 元素的迭代器。
   */
 public <K,V> List<KafkaStream<K,V>>
    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

    /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder.使用預設解碼器,為匹配wildcard的topics建立訊息流列表
   */
  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);


  /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.使用預設解碼器,為匹配wildcard的topics建立訊息流列表
   */
  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);


  /**
   *  Commit the offsets of all topic/partitions connected by this connector.通過connector提交所有topic/partitions的offsets
   */
  public void commitOffsets();


  /**
   *  Shut down the connector: 關閉connector
   */
  public void shutdown();
}
你可以根據這個例子學習怎樣使用high level consumer api。

2、Old Simple Consumer  API

class kafka.javaapi.consumer.SimpleConsumer {
  /**
   *  Fetch a set of messages from a topic.從topis抓取訊息序列
   *
   *  @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.指定topic 名字,topic partition,開始的位元組offset,抓取的最大位元組數
   *  @return a set of fetched messages
   */
  public FetchResponse fetch(kafka.javaapi.FetchRequest request);


  /**
   *  Fetch metadata for a sequence of topics.抓取一系列topics的metadata
   *
   *  @param request specifies the versionId, clientId, sequence of topics.指定versionId,clientId,topics
   *  @return metadata for each topic in the request.返回此要求中每個topic的元素據
   */
  public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);


  /**
   *  Get a list of valid offsets (up to maxSize) before the given time.在給定的時間內返回正確偏移的列表
   *
   *  @param request a [[kafka.javaapi.OffsetRequest]] object. 
   *  @return a [[kafka.javaapi.OffsetResponse]] object.
   */
  public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);


  /**
   * Close the SimpleConsumer.關閉
   */
  public void close();
}
對大多數應用來說, high  level consumer  Api已經足夠了,一些應用要求的一些特徵還沒有出現high level consumer介面(例如,
當重啟consumer時,設定初始offset)。他們可以使用low level SimpleConsumer  Api。邏輯可能會有些複雜,你可以根據這個例子學習一下。
3、New Consumer API
新consumer API統一了標準,原來存在於0.8版本的high-level以及low-level consumer APIs之間差異不存在了。你可以通過使用下面maven配置方式,
指明客戶端依賴的jar包,這樣就可以使用新的consumer API。
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.10.0.0</version>
	</dependency>
Examples showing how to use the consumer are given in the javadocs.
三、Streams API
在0.10.0 release版本中,我們增加了新的客戶端呼叫庫Kafka Streams,用來支援流式處理應用。Kafka Streams庫認為是
alpha版本質量的,同時它的公共呼叫APIs在將來有可能會修改。你可以像下面maven配置模式一樣,指明Kafka Streams的
依賴關係來呼叫Kafka Streams。
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-streams</artifactId>
	    <version>0.10.0.0</version>
	</dependency>

Javadocs中展示瞭如何呼叫這個庫(注意這些類都是不穩定的,表明以後的版本中可能會修改)。

相關推薦

kafka30.8.2-kafka APIjava版本

    Apache Kafka包含新的java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了相容性,它們仍將存在一段時間。可以通過一些單獨的jar包呼叫這些客戶端,這些包的依賴性都比較小,同時老的Scala客戶端仍會存在。 一、Producer

Kafka20.8.2- 基本介紹Getting Start

來源: 說明: 原文中某些專有名詞不做翻譯: kafka topic partition consumer producer server client high-level 1、開始 1.1 介紹kafka可提供分散式、分割槽的、可備份的日誌提交服務,同時也是設計

kafka4 0.8.2Configuration配置選項翻譯

來源:http://kafka.apache.org/documentation.html#configuration 3.     Configuration Kafka在配置檔案中使用key-value方式進行屬性配置。這些values可以通過檔案或者程式設計方式提

kafka130.10.1Document5configuresconsumer配置資訊

In 0.9.0.0 we introduced the new Java consumer as a replacement for the older Scala-based simple and high-level consumers. The configs f

kafka-kafka APIjava版本

spring mvc+my batis dubbo+zookeerper kafka restful redis分布式緩存 Apache Kafka包含新的Java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了兼容性,它們仍將存在一段時間。可以通過一些單獨的jar包調用這些客

Linux 管理命令語法、參數、實例全匯總

rwx 界面 endif 群組 new 才有 func {} ans 命令:cat cat 命令用於連接文件並打印到標準輸出設備上。 使用權限 所有使用者 語法格式 cat [-AbeEnstTuv] [--help] [--version] fileName 參數

vs2010單中新增對話方塊並在對話方塊中新增屬性框標籤框

1.建立單文件Demo 在資源檢視Dialog中插入兩個Dialoge,Style設定為child,Border設定為chill。為兩個對話方塊分別新增類,基類為CPropertyPage,類名CP1,CP2。在P1的標頭檔案新增 #include "resource.h" 2.在類檢視中在De

根據介面書寫介面,並在前端呼叫介面返回顯示出資料加下載

---恢復內容開始---  1.首先來看介面文件(其中一個介面): 介面的編寫: 1 /** 2 * 7.11 餘額明細查詢介面 3 * 4 * @param token 5 * @param pageNum 6

通過swagger2markup+asciidoctorj生成html和pdf並解決asciidoctorj生成的pdf檔案中文顯示不全問題maven方式及java程式碼方式

通過swagger2markup+asciidoctorj生成html和pdf文件(maven方式及java程式碼方式) 任務:通過同事的json檔案生成相應的html和pdf文件 前言 開始時swagger2markup和asciidocto

Kafka存儲機制那些事

kafka 方便 成對出現 讀者 開源項目 sock 位置 通過 刪除 點評一下先:kafka的存儲主要有幾個特點: 1. 多級索引(名義上是1級索引,但是這級索引依賴了文件列表,相當於文件列表是第一級索引,所以是二級索引),二級索引文件和數據文件一一對應。 相比只有1

python 修改內容3種方法

bak ram code param post img span clas 正則表達 一、修改原文件方式 1 def alter(file,old_str,new_str): 2 """ 3 替換文件中的字符串 4 :param f

[py]處理3個方法

效果 字符 python生成器 pen mark str http 方法 lines file處理的3個方法: f和f.readlines效果一樣 # f.read() 所有行 -> 字符串 # f.readline 讀取一行 -> 字符串 # f.

kafka

介紹 Kafka是一個分散式的、可分割槽的、可複製的訊息系統。它提供了普通訊息系統的功能,但具有自己獨特的設計。   1、1 術語: Topic:可以理解為一個MQ訊息佇列的名字 Producers:將向Kafka topic釋出訊息的程式 Consume

【pySerial3.4官方3、pySerial API

pySerial API  類 本地埠 類serial.Serial __init__(port = None,baudrate = 9600,bytesize = EIGHTBITS,parity = PARITY_NONE,stopbits = STOPBITS_ONE

UI5--4.3-Controls

現在是時候構建我們的第一個小UI了,將HTML主體中的“Hello World”文字替換為SAPUI5控制元件sap.m.Text。首先,我們將使用JavaScript控制元件介面來設定UI,然後將控制元件例項放入HTML體中。 Preview   The "Hello World" text

Debezium翻譯02:啟動Docker,Debezium,Zookeeper,Kafka

使用Docker執行Debezium 執行Debezium涉及三個主要服務:Zookeeper、Kafka和Debezium的聯結器服務。 本教程將指導您使用Docker和Debezium的Docker映像啟動這些服務的單個例項。 另一方面,生產環境需要執行每個服務的多個例項,以保證

《Apache Zookeeper 官方》-3 快速指南:使用zookeeper來協調分散式應用

原文連結  譯者:softliumin  校對:方騰飛 本節內容讓你快速入門zookeeper。它主要針對想嘗試使用zookeeper的開發者,幷包含一個ZooKeeper單機伺服器的安裝說明,你可以用一些命令來驗證它的執行,以及簡單的程式設計例項。最後,為了考慮到方便性,有一些複雜的安裝部分

[網路開發]RakNet翻譯(3)

如何將你的資料編碼到一個數據包中?執行RakNet的系統通過人們所熟知的資料包進行通訊,實際上所有在Internet上執行的系統都如此。更準確的說,在UDP協議下,它用的是資料報。每一個通過RakNet建立的資料報中都包含了一條或者多條資訊。訊息可以是通過你建立的,例如位置資訊,血量資訊,或者其他通過RakN

《Spring 5官方3 IOC容器 3.11-3.16

原文連結 譯者:maxam0128 3. IOC 3.11 使用JSR 330標準註解 從Spring3.0開始,Spring提供了對JSR-330標準註解(依賴注入)的支援。這些註解和Spring的註解以相同的方式進行掃描。你只需要在你的classpath中新增有關的jar包。 如果你使用

《Spring Cloud Netflix官方3.熔斷器:Hystrix Clients

原文連結 Netfilix建立了一個名為Hystrix的庫,實現了熔斷器模式。在微服務架構中,它通常有多個服務呼叫層。 圖3.1 微服務圖 一個底層服務的故障會引發直至使用者互動層的連鎖故障。在一個設定時長為“metrics.rollingStats.timeInMilliseconds”