1. 程式人生 > >Kafka 0.9 新版本consumer客戶端使用介紹

Kafka 0.9 新版本consumer客戶端使用介紹

kafka最初時開發時, 所帶的producer和consumer client都是Scala所寫. 我們逐漸發現這些API具有一些限制. high-level的api支援consumer groups和故障轉移, 但是不支援許多複雜的使用場景, 同時還有一個simple consumer client提供全部的控制, 但是需要使用者管理故障轉移和負責錯誤處理. 為了支援一些舊client api很難或者無法做到的用例場景, 我們決定重新設計client, 提供一組能夠長期支援的新api.

第一步是在0.8.1重寫了Producer API, 0.9版本作為第二個步驟引入了新的Consumer API. 新的API基於kafka本身提供的一個新的group coordination 協議構建, 有以下的優點:

  • 清晰統一的API: 新的consumer結合了之前的”simple”和”high-level” consumer客戶端, 不僅提供group coordination還可以進行自己控制消費策略的lower level訪問

  • 減少了依賴: 新的consumer由純Java所寫, 不需要依賴Scala執行時環境, 也不需要Zookeeper, 意味著你的專案所需要的包會減少一些

  • 更好的安全性: Kafka 0.9版本實現的 security extensions在新的consumer中提供支援

  • 新的consumer還添加了一組用於管理容錯的consumer group的協議. 之前這個功能由一個重量級的Java client(與Zookeeper進行互動)來實現. 這種複雜的邏輯使得完整功能的客戶端難以由其他語言實現. 由於引進新的協議, 這變得遠比以前容易. 實際上已經還在C客戶端上轉向使用這個協議.

雖然新consumer使用了重新設計的API和一個新的協調協議, 但是和之前舊版本API概念並不是完全不同的, 所以對舊consumer熟悉的使用者理解新的API並不會有什麼困難. 然而還是有一些關於group管理和執行緒模型等的很微妙的細節值得特別注意. 這篇教程的目的就是覆蓋新consumer api的使用以及解釋所有的這些細節.

值得注意的是: 在重寫的過程中, 新的consumer在穩定性方面仍然被視為beta. 我們在0.9.0分支中以及修復了很多重要的bug, 如果你在使用0.9.0.0發行版時遇到了問題, 我們希望你能夠對那個分支進行測試. 如果仍然有問題, 請反饋到 mail lists

JIRA.

Getting Started

在開始編碼之前, 我們先回顧一下一些基本概念. 在Kafka中, 每個topic被分成一組稱為partitions的logs. Producer向這些logs的末尾寫入訊息, consumer則自己按自己的節奏讀取log. Kafka通過在一個consumer group中分配partitions來伸縮topic的消費, consumer group是一組有同樣group id的consumers. 下圖表示的是一個擁有3個分割槽的topic, 以及一個有兩個成員的consumer group. 每一個分割槽都只會分配給組內的一個成員.
Distributing Kafka Partitions among consumer groups

舊的consumer依賴於Zookeeper來進行group管理, 新的消費者使用kafka內部的group coordination協議. 對於每個group, brokers中的一個被選為group coordinator. 這個coordinator負責管理group的狀態, 它的主要工作是當新的成員加入, 或者原本的成員離開, 或者topic的元資料發生了改變時, 協調partition分配. 重新分配partition這個過程被稱為rebalancing the group.

當group第一次初始化時, consumer通常從分割槽的最開始或者最末尾開始讀取. 每個partition log中的訊息都是按順序讀取. 隨著consumer的讀取, 它將會commit它所成功處理的message的offset. 例如下圖中, consumer的位置位於offset 6, 上一次commit的offset為1.
consumer's position in the log

當一個partition被重分配給組內的另一個consumer, 其最初位置會被設定成上一次commit的offset. 如果上例中的consumer突然奔潰了, 那麼接管這個partition的另外一個組成員會從offset 1繼續消費. 在這種情況下,它會重新處理上一次提交位置到消費者奔潰的位置6的訊息.

這張圖還有兩個log中比較重要的位置. log end offset是最後一條寫入log的message的offset, 而high watermark是最後一條message成功複製到所有的log replica的message offset. 從consumer的角度來看, 所知道的最主要的事情就是其最多能夠讀到high watermark的message. 這能夠阻止consumer讀取到未被複制到其他broker的, 可能會丟失的message.

Configuration and Initialization 配置和初始化

開始使用consumer之前, 需要將kafka-clients依賴加入到你的專案中.

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.9.0.0-cp1</version>
</dependency>

consumer使用一個Properties file來進行構建. 以下提供了使用consumer group所需要的最小配置.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

和之前的consumer, producer一樣, 我們需要為consumer提供一個broker的初始化列表, 以用於發現叢集中其他的broker. 這個配置並不需要提供所有的broker, client會從給定的配置中發現所有的當前存活的broker. 這裡我們假設broker執行在本地, 同時consumer還需要知道如何反序列化message key和value. 最後, 為了加入consumer group, 我們需要配置一個group id. 隨著教程的繼續, 我們會介紹更多的配置.

* Topic訂閱*

為了開始消費, 你必須指定你的應用需要讀取的topics, 在下面的例子中, 我們訂閱了topic foo和bar.

consumer.subscribe(Arrays.asList(“foo”, “bar”));

訂閱之後, consumer會與組內其他consumer協調來獲取其分割槽分配. 這都是在你開始消費資料時自動處理的. 後面我們會說明如何使用assign api來手動分配分割槽, 但是要注意的是, 不能同時混合使用自動和手動的分配.

subscribe方法並不是遞增的: 你必須包含所有你想要消費的topics. 你可以在任何時刻改變你想消費的topic集, 當你呼叫subscribe時, 之前訂閱的topics列表會被新的列表取代.

基本的Poll事件迴圈

consumer需要能夠並行獲取資料, 在眾多brokers中獲取多個topic的許多分割槽訊息. 為了實現這個目的, consumer API的設計成風格類似於unix中的poll或者select呼叫: 一旦topic註冊了, 所有的coordination, rebalancing和data fetch都是由一個處於迴圈中的poll呼叫來驅動. 這樣就提供了一個能在一個執行緒裡處理所有的IO的簡單有效的實現.

在你訂閱一個topic之後, 你需要啟動這個event loop以獲得partition分配和開始獲取資料. 聽起來很複雜, 但是所有你需要做的就只有呼叫poll, 然後consumer客戶端本身負責處理其他的工作. 每一次poll呼叫都會返回從所分配的partition獲取的一組訊息(也許是空的). 下面的例子展示了一個基本的poll迴圈, 列印獲取的records的offset和value.

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
  }
} finally {
  consumer.close();
}

poll API根據當前的位置返回records, 當group第一次建立時, 消費開始的位置會被根據reset policy(一般設定成從每個分割槽的最早的offset或者最新的offset開始)來設定. 只要consumer開始提交offset, 那麼之後的rebalance都會重置消費開始位置到最新的被提交的offset. 傳遞給poll的引數是consumer其在當前位置等待有record返回時需要被阻塞的時間. 一旦有record時, consumer會立即返回, 如果沒有record, 它將會等待直到超時.

consumer被設計成只在自己的執行緒中執行, 在沒有外部同步措施的情況下, 在多執行緒中使用時不安全的, 同時也不建議這樣做. 在這個例子中, 我們使用了一個flag來使得當應用停止時能夠從迴圈中跳出. 當這個flag被另一個執行緒設定成false時, pool返回時迴圈會跳出, 無論返回什麼record, 處理過程都會結束.這個的例子使用了一個相對較少的超時時間, 以使得關閉consumer並不會有太大的延時.

你還可以設定一個較長的timeout, 並且使用wakeup API來使得其從迴圈中跳出.

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + “: ” + record.value());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  consumer.close();
}

我們將timeout改為了Long.MAX_VALUE, 意味著consumer會一直阻塞直到有record返回. 和前面設定flag不同, 用於觸發shutdown的執行緒可以呼叫consumer.wakeup()來中斷一次poll, 使其丟擲WakeupExection. 這個API是執行緒安全的. 注意如果當前沒有活躍的poll, 那麼異常會在下一次poll呼叫時丟擲. 在這個例子中, 我們捕捉這個異常, 阻止其繼續傳播.

完整程式碼

在接下來的例子中, 我們將所有的程式碼塊放到一起來構建一個task, 初始化consumer, 訂閱一個topic列表, 並且執行poll呼叫直到外部關閉它.

public class ConsumerLoop implements Runnable {
  private final KafkaConsumer<String, String> consumer;
  private final List<String> topics;
  private final int id;

  public ConsumerLoop(int id,
                      String groupId, 
                      List<String> topics) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put(“group.id”, groupId);
    props.put(“key.deserializer”, StringDeserializer.class.getName());
    props.put(“value.deserializer”, StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
  }

  @Override
  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out.println(this.id + ": " + data);
        }
      }
    } catch (WakeupException e) {
      // ignore for shutdown 
    } finally {
      consumer.close();
    }
  }

  public void shutdown() {
    consumer.wakeup();
  }
}

為了測試這個例子, 你需要一個執行0.9.0.0的kafka發行版的broker, 以及一個提供一些string資料進行消費的topic. 寫入一些string資料的最簡單的方法就是通過kafka-verifiable-producer.sh指令碼. 你需要確保topic有超過一個partition. 例如對於只有一個執行在localhost的kafka broker和Zookeeper, 你可以在kafka發行包根目錄下執行以下的命令:
# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181

# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092

然後我們可以用以下程式碼來設定一個有3個成員的consumer group, 所有的成員都訂閱我們剛剛建立的topic.

public static void main(String[] args) {
  int numConsumers = 3;
  String groupId = "consumer-tutorial-group"
  List<String> topics = Arrays.asList("consumer-tutorial");
  ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

  final List<ConsumerLoop> consumers = new ArrayList<>();
  for (int i = 0; i < numConsumers; i++) {
    ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
    consumers.add(consumer);
    executor.submit(consumer);
  }

  Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
      for (ConsumerLoop consumer : consumers) {
        consumer.shutdown();
      } 
      executor.shutdown();
      try {
        executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        e.printStackTrace;
      }
    }
  });
}

這個例子將三個Runnable consumer提交到一個executor. 每一個執行緒都有一個單獨的id, 所以你可以看出來哪個執行緒正在接收資料. 當你準備結束程序時, shutdown hook會被呼叫, 它會呼叫wakeup來終止三個執行緒, 並且等待它們關閉. 以下是程式執行的資料:

2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}

消費者存活

每個consumer都是被分配它所訂閱的topics中所有partitions的一部分partitions. 就像是對這些partitions的group lock. 只要這個鎖被持有, 組內其他的成員就無法從這些分割槽中讀取資料. 當consumer還存活著, 這就是你想要的, 這是能夠避免重複消費的方法(加鎖). 但是如果消費者因為機器或者應用的故障掛掉了, 那麼你希望鎖能夠釋放, 並且分割槽能夠分配給其他存活的consumer.

Kafka group coordination協議通過心跳機制來解決這個問題. 在每次重分配之後, 當前所有的成員會週期性傳送心跳給group coordinator. 只要coordinator接收心跳, 它就會假設成員是存活的. 每次接收到心跳, coordinator都會開始(或重置)一個timer. 當timer過期時, 如果還沒有心跳收到, coordinator會將這個成員標記為掛掉, 然後通知其他組內其他成員需要重新分配分割槽. timer的時間被稱為session timeout, 由客戶端的session.timeout.ms進行設定.

props.put(“session.timeout.ms”, “60000”);

session timeout能夠保證, 當機器或者應用奔潰, 或者網路分割槽導致consumer與coordinator分隔開來時, 鎖會被釋放. 然而應用故障判斷卻有點討巧, 因為有可能consumer一直在傳送心跳給coordinator, 但並意味著應用的狀態是正常的.

consumer的poll api就是設計用於解決這個問題的(傳送心跳), 當你呼叫poll或者其他阻塞API時所有的網路IO都是在前臺處理的, consumer並不使用任何後臺執行緒. 意味著傳送給coordinator的心跳只會在poll呼叫時被髮送( 譯者注: 這個在0.10.1.0版本改成後臺傳送心跳), 如果應用停止poll( 無論是處理record的程式碼丟擲了異常或者系統奔潰了),那麼心跳將會停止傳送, 那麼session timeout就會過期, 組內的分割槽就會重新分配.

唯一一個你需要注意的問題就是: 如果你處理訊息的時間比session timeout要長的話, 那麼會使得coordinator認為consumer已經掛了, 導致partition重新分配. 你應該將session timeout設定的足夠大, 以使得這種情況不會發生. 預設session timeout是30s, 但是將它設定成幾分鐘是不可行的, 這樣就會導致coordinator需要更長的時間來檢測真正的consumer奔潰.

訊息傳遞語義

當一個consumer group被建立時, 最開始消費的offset位置是由auto.offset.reset配置來控制的. 一旦consumer開始消費, 它會根據應用的需要自動提交offset. 每次rebalance, 消費開始的position都會被設定成分割槽最後提交的offset. 如果consumer在為已經成功處理的message提交offset之前奔潰了, 那麼重新分配到這個分割槽的consumer會重複消費這些訊息. 所以, 你提交訊息的頻率越頻繁, 那麼因為奔潰而造成的重複消費就會越少.

到現在,我們都假定開啟了自動提交. 當我們將enable.auto.commit設定成true(預設是true), consumer會根據auto.commit.interval.ms的設定, 來週期性自動觸發offset提交. 通過減少commit間隔, 你可以減少在奔潰之後消費者需要重新處理的message數量.

為了使用consumer的commit API, 你應該首先通過在consumer的配置中設定enable.auto.commit為false來禁止自動commit.

props.put(“enable.auto.commit”, “false”);

commit API本身用起來不難, 但是重要的是你如何把它放入到程式碼中. 下面使用同步commit API是手動commit最簡單的方法.

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    try {
      consumer.commitSync();
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}

使用無參的commitSync方法會提交上次poll返回的最新的offset. 這個呼叫是阻塞式的, 直到commit成功或者因為不可恢復的錯誤而失敗. 你需要擔心的最大的問題就是訊息處理時間可能比session timeout長. 當這種情況發生時, coordinator會把這個consumer踢出group, 那麼consumer就會丟擲CommitFailedException. 你的應用應該處理這個錯誤, 回滾自從上次成功提交offset以來的消費的message造成的改變.

一般你應該在訊息被成功處理完後才去提交offset. 如果consumer在commit傳送之前就奔潰了, 那麼message將會被重新處理. 如果commit策略保證最後提交的offset不會超過當前的消費position, 那麼你就會獲得”at least once(至少一次)”的訊息分發語義.
consumer-atmostonce-1.png

*Figure 3: 提交的offset超前於當前消費位置*

通過改變commit策略來保證當前的消費位置絕對不超過最後一次committed offset, 如上圖所示, 那麼你會獲得”at most once(至多一次)”的語義. 如果消費者在處理到最後一次committed offset之前就奔潰了, 那麼所有在這個間隔之間的資料都會丟失, 但是你可以確保沒有訊息會被處理超過一遍. 為了實現這種策略, 我們只需要改變commit和訊息處理的順序.
try {
  while (running) {
  ConsumerRecords<String, String> records = consumer.poll(1000);

  try {
    consumer.commitSync();
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}

使用自動提交為你提供”at least once”的語義, 因為consumer保證只commit返回給應用的message的offset. 你可能會重複處理的message就被限定在, 兩次commit時間間隔(由auto.commit.interval.ms設定)之間你的應用處理的message.

通過使用commit API, 你對可以接受多少重複處理有更好的控制. 在最極端的情況下, 你可以在每次處理完訊息都提交offset. 如下面所示:

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);

    try {
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.offset() + ": " + record.value());
        consumer.commitSync(Collections.singletonMap(record.partition(), new OffsetAndMetadata(record.offset() + 1)));
      }
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}

在這個例子中, 我們在commitSync方法中顯式的傳遞了我們想要提交的offset. 提交的offset應該是你的應用將要讀取的下一條message的offset. 當呼叫無參的commitSync方法時, 就會提交返回給應用的最後一條message的offset(再加上1), 但是我們不能這樣做, 因為這樣會使得提交的位置超過實際處理的進度.

明顯每消費一條訊息就commit offset並不合適大多數場景, 因為處理執行緒必須阻塞, 直到commit請求從伺服器返回. 這樣會大大減少吞吐量. 一個更加合理的方式就是每N條記錄就提交一次, N可以進行調整以獲得更好的效能.

commitSync方法的引數是一個鍵是topic partition, 值是offsetAndMetadata例項的map. commit API允許在每次commit時包含一些Metadata, 這些資料可以是記錄commit的時間, 傳送commit的host地址, 或者任何你的應用需要的資訊. 在這個例子中, 我們讓它為空.

相對每次接收到訊息都提交offset, 更加合理的方法就是處理完一個分割槽的訊息後再commit offset. ConsumerRecords Collection提供訪問其中包含的partitions集合的方法, 以及每個partition的messages的方法. 下面的例子就展示了用法.

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
    for (TopicPartition partition : records.partitions()) {
      List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
      for (ConsumerRecord<String, String> record : partitionRecords)
        System.out.println(record.offset() + ": " + record.value());

      long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
      consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));
    }
  }
} finally {
  consumer.close();
}

到目前為止, 所有的例子都是關注於同步commit API, 但是consumer同樣也有一個非同步的API, commitAsync, 使用非同步的commit可以使得你的應用擁有更高的吞吐量, 因為你可以不用等待commit返回, 開始處理下一批message. 需要權衡的就是你可能隨後發現, commit失敗了. 以下的例子展示了基本用法:

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
                             Exception exception) {
        if (exception != null) {
          // application specific failure handling
        }
      }
    });
  }
} finally {
  consumer.close();
}

注意到對於commitAsync我們提供了一個callback, 這個方法在consumer commit結束時被呼叫(無論commit成功與否), 如果你不需要callback, 那麼你可以呼叫無參的commitAsync.

檢視Consumer Group資訊

當一個consumer group處於活躍時, 你可以在命令列中使用位於Kafka發行包bin目錄下的consumer-groups.sh指令碼獲取partition assignment和消費進度.

# bin/kafka-consumer-groups.sh –new-consumer –describe –group consumer-tutorial-group –bootstrap-server localhost:9092 

例子輸出結果如下所示:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 2, 6666, 6666, 0, consumer-3_/127.0.0.1

上面顯示了consumer group中所有的partition assignment, 哪個消費者例項擁有哪個分割槽, 以及最新的committed offset(這裡的current offset). lag表示log end offset與最新的committed offset的差值. 管理員可以使用這個命令來監控以保證consumer group跟上了producer的進度.

使用手動分割槽分配

在教程前面提到的, 新的consumer為一些並不需要consumer group的應用場景實現了更底層的訪問. 推薦使用這個API的理由就是它的易用性. 舊的”simple” consumer同樣也提供了這個功能, 但是需要你自己進行一大堆的異常處理. 使用新的consumer API, 你只需要分配你想要讀取資料的partition, 然後開始poll data就可以了.
下面的例子展示瞭如何好使用partitionFor API來將一個topic的所有分割槽分配給一個consumer.

List<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partition : consumer.partitionsFor(topic))
  partitions.add(new TopicPartition(topic, partition.partition()));
consumer.assign(partitions);

subscribe方法類似, assign方法需要傳遞你想要讀取的partition的列表. 一旦partition被分配了, poll的工作方式就和之前講述的一致.

需要注意的是, 無論是使用一個simple consumer或者是consumer group, 所有的offset commit都會經過group coordinator. 所以如果你想要commit offsets, 你仍然需要設定group.id, 以防止與其他consumer發生衝突. 如果一個simple consumer試著使用和一個活躍的consumer group相同的group id來commit offset, 那麼coordinator會拒絕這個commit(提交的結果就是丟擲CommitFailedException). 有相同的group id的simple consumer卻不會有錯誤.

Conclusion

新的consumer為Kafka社群帶來很多的好處, 包括一個清晰的API, 更好的安全性以及更少的依賴. 這篇教程介紹了基本的應用, 集中於poll的語義,以及使用commit API來控制訊息傳遞語義. 雖然還有很多細節沒有涉及到, 但是這對於開始使用kafka consumer來說已經足夠了. 儘管新的consumer還在開發中, 但是我們鼓勵你試一試. 如果有出現問題, 請在mailing list上告訴我們.

相關推薦

Kafka 0.9 版本consumer客戶使用介紹

kafka最初時開發時, 所帶的producer和consumer client都是Scala所寫. 我們逐漸發現這些API具有一些限制. high-level的api支援consumer groups和故障轉移, 但是不支援許多複雜的使用場景, 同時還有一

Kafka 0.11版本釋出:主要的功能變更介紹:支援 EOS, 事務和冪等producer

Apache Kafka近日推出0.11版本。這是一個里程碑式的大版本,特別是Kafka從這個版本開始支援“exactly-once”語義(下稱EOS, exactly-once semantics

Kafka 0.9 消費者API

kafka誕生之初,它自帶一個基於scala的生產者和消費者客戶端。但是慢慢的我們認識到這些API有很多限制。比如,消費者有一個“高階”API支援分組和異常控制,但是不支援很多更復雜的應用場景;它也有一個“低階”API,支援對細節的完全控制,但是要求碼農自己控制失敗和異常

java在線聊天項目0.9版 實現把服務接收到的信息返回給每一個客戶窗口中顯示功能之客戶接收

nec 一個 out for tex ava 添加 implement com 客戶端要不斷接收服務端發來的信息 與服務端不斷接收客戶端發來信息相同,使用線程的方法,在線程中循環接收 客戶端修改後代碼如下: package com.swift; import java.

基於Kafka 0.9版本 使用ACL進行許可權控制

kafka附帶一個可插拔的認證,並使用zookeeper來儲存所有的acl。kafka的acl在一般格式定義"Principal P is [Allowed/Denied] Operation O From Host H On Resource R”,你可以閱讀更多關於KIP-11的結構,為了新增,刪除或列

kafka 0.9版本堆外記憶體溢位

1、背景線上kafka是0.9版本,最大堆記憶體1G。從server.log看到,java.lang.OutOfMemoryError: Direct buffer memory,是堆外記憶體溢位了。加大堆外記憶體,過一段時間還是堆外記憶體溢位。2、原因分析猜測應該是禁用了手

Storm作為消費者對接Kafka 0.10.x+版本

Storm應用場景—作為新消費者對接Kafka 0.10.x+版本(一) 00 背景 隨著Kafka版本的升級,Storm作為消費者對接Kafka 0.10.x+版本的方式上,與之前存在差異,現將新的方式記錄下來,為以後遇到使用Storm實時處理新版Kafka資

Kafka 0.8 Producer (0.9以前版本適用)

Kafka舊版本producer由scala編寫,0.9以後已經廢除 示例程式碼如下: import kafka.producer

Kafka 2.3 Producer (0.9以後版本適用)

kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。 這裡直接使用最新2.3版本,0.9

Kafka 0.11功能介紹:空消費組延遲rebalance

延時處理 max 詳細 方便 聲明 qrcode 狀態機 分享圖片 -c Kafka 0.11新功能介紹:空消費組延遲rebalance 在0.11之前的版本中,多個consumer實例加入到一個空消費組將導致多次的rebalance,這是由於每個consumer i

Javascript中獲取瀏覽器類型和操作系統版本客戶信息常用代碼

cin nav coo temp undefined light safari macintosh else /** * @author hechen */ var gs = { /**獲得屏幕寬度**/ ScreenWidth: function () {

CAS統一登入認證(9): 非典型php客戶

根據《 CAS統一登入認證(7): 非典型.net客戶端 》上篇文章的思路,重寫了一個簡單驗證的cas php驗證客戶端  本文無需設定攔截器,只是靜默的簡單通過cas驗證使用者,訪問caslogin.php 地址才會進行連線驗證。 訪問url如:&nbs

微信小程式 基礎庫版本客戶版本對應關係

iOS 客戶端版本 基礎庫版本 6.7.2 2.3.0 6.7.0 2.2.5 6.6.7 2.1.3 6.6.6

redis-5.0 cluster帶認證及客戶連線

      Redis在3.0版正式引入redis-cluster叢集這個特性。Redis叢集是一個提供在多個Redis間節點間共享資料的程式集。Redis叢集是一個分散式(distributed)、容錯(fault-tolerant)的Redis記憶體K/V服務,叢集可以使

SpringCloud 2.0(二)——Eureka客戶搭建

上一篇:SpringCloud 2.0(一)——註冊中心Eureka搭建 這一節,我們基於SpringBoot搭建一個服務的提供方,然後註冊到上一節中我們搭建的Eureka註冊中心。還是跟上一篇一樣,去Spring的官網搜尋對應的Eureka Discovery依賴,如下圖:因

Redis Desktop Manager 0.9.3 版本下載(官方最新版需要訂閱,好像要給錢才行)

下載地址:https://pan.baidu.com/s/1P856NPusJLUSFwQjjPdltA 密碼: 12d3   版本是兩三個月前,我從官網下載的,然後順便存到了我的行動硬碟上。0.9.3.817.exe github 上有 redis destop

9.redis學習筆記-客戶&伺服器.md

12. 客戶端 12.1. 客戶端屬性 客戶端狀態包含的屬性可以分為兩類: 通用屬性,很少與特定功能相關,無論客戶端執行什麼工作,都要用到這些屬性 與特定功能相關的屬性,比如操作資料庫要用到的db屬性和dictid屬性 12.1.1. 套接字描述 客戶端狀

PEACHPIE 0.9.11 版本釋出,可以上生產了

0.9.11是第一個非預覽版本,也就是說可以用於生產了,編譯本身快速且使用者友好(更好的錯誤訊息),有一個重大改進的文件(https://docs.peachpie.io/)和新的.NET Core 的 PeachPie Project ,可以和 Visual Studio(> = 2017更新6)

事無鉅細 Apache Kafka 0.9.0.1 叢集環境搭建

Kafka叢集環境依賴於Zookeeper環境。因此我們的環境搭建實際分為兩部分。Zookeeper環境搭建和Kafka環境搭建。 Zookeeper 3.4.8叢集搭建 部署安裝包 下載 wget http://mirrors.cn

MySQL資料庫驅動mysql-connector-java的8.0.9-rc版本連線MySQL資料庫

  之前我的MySQL資料庫驅動mysql-connector-java版本號為5.1.34,在升級成8.0.9-rc版本後,發現原來的連線方式報錯了。故在這裡記錄一下新版本的MySQL資料庫驅動的連線使用方式。   先貼出來以前舊版本(5.1.34)的連線方