專為實時而構建:使用Apache Kafka進行大資料訊息傳遞 第2部分
在Apache Kafka簡介 的前半 部分,您使用Kafka開發了幾個小規模的生產者/消費者應用程式。從這些練習中,您應該熟悉Apache Kafka訊息傳遞系統的基礎知識。在下半部分,您將學習如何使用分割槽來分佈負載並橫向擴充套件應用程式,每天處理多達數百萬條訊息。您還將瞭解Kafka如何使用訊息偏移來跟蹤和管理複雜的訊息處理,以及如何在消費者失敗時保護您的Apache Kafka訊息傳遞系統免於失敗。我們將從第1部分開發用於釋出 - 訂閱和點對點用例的 示例應用程式 。
Apache Kafka中的分割槽
Kafka中的topic可以細分為分割槽。例如,在建立名為Demo的topic時,您可以將其配置為具有三個分割槽。伺服器將建立三個日誌檔案,每個檔案分割槽一個。當生產者向topic釋出訊息時,它將為該訊息分配分割槽ID。然後,伺服器將訊息僅附加到該分割槽的日誌檔案中。
如果您隨後啟動了兩個消費者,則伺服器可能會將分割槽1和2分配給第一個消費者,將分割槽3分配給第二個消費者。每個消費者只能從其分配的分割槽中讀取。您可以在圖1中看到為三個分割槽配置的Demo的topic。

為了擴充套件這個場景,想象一下有兩個代理的Kafka叢集,它位於兩臺機器中。分割槽演示tpoic時,您將其配置為具有兩個分割槽和兩個副本。對於此類配置,Kafka伺服器會將兩個分割槽分配給群集中的兩個broker。每個broker都是其中一個分割槽的領導者。
當生產者釋出訊息時,它將轉到分割槽領導者。領導者將獲取訊息並將其附加到本地計算機上的日誌檔案中。第二個broker會被動地將該提交日誌複製到自己的機器上。如果分割槽負責人發生故障,第二個broker將成為新的領導者並開始提供客戶端請求。以同樣的方式,當消費者向分割槽傳送請求時,該請求將首先發送給分割槽領導者,分割槽領導者將返回所請求的訊息。
分割槽的好處
考慮分割槽基於Kafka的訊息傳遞系統的好處:
- 可伸縮性 :在只有一個分割槽的系統中,釋出到topic的訊息儲存在一個日誌檔案中,該檔案存在於一臺計算機上。topic的訊息數必須適合單個提交日誌檔案,並且儲存的訊息大小永遠不會超過該計算機的磁碟空間。通過對topic進行分割槽,您可以通過將訊息儲存在群集中的不同計算機上來擴充套件系統。例如,如果您想為演示主題儲存30千兆位元組(GB)的訊息,您可以構建一個由三臺計算機組成的Kafka叢集,每臺計算機具有10 GB的磁碟空間。然後,您將topic配置為具有三個分割槽。
- 伺服器負載平衡 :擁有多個分割槽可讓您在broker之間傳播訊息請求。例如,如果您的topic每秒處理100萬條訊息,則可以將其劃分為100個分割槽,並將100個broker新增到群集中。每個broker都是單個分割槽的領導者,負責每秒響應10,000個客戶端請求。
- 消費者負載平衡 :與伺服器 負載平衡 類似,在不同機器上託管多個消費者可以分散消費者負載。假設您希望從具有100個分割槽的topic每秒消耗100萬條訊息。您可以建立100個消費者並並行執行它們。Kafka伺服器將為每個消費者分配一個分割槽,每個消費者將並行處理10,000個訊息。由於Kafka僅將每個分割槽分配給一個消費者,因此在分割槽內將按順序使用每個訊息。
兩種分割槽方式
生產者負責決定訊息將進入的分割槽。生產者有兩種控制這種分配的選擇:
- 自定義分割槽程式 :您可以建立實現該
org.apache.kafka.clients.producer.Partitioner
介面的類。此自定義Partitioner
將實現業務邏輯以確定傳送訊息的位置。 - DefaultPartitioner :如果您不建立自定義分割槽程式類,則預設情況下將使用該類
org.apache.kafka.clients.producer.internals.DefaultPartitioner
。對於大多數情況,預設分割槽程式足夠好,提供三個選項: - 手動 :建立
ProducerRecord
時,使用過載的建構函式new ProducerRecord(topicName, partitionId,messageKey,message)
指定分割槽ID。 - 雜湊(區域性敏感) :建立
ProducerRecord
時,通過呼叫指定messageKey
的構造方法new ProducerRecord(topicName,messageKey,message)
。DefaultPartitioner
將使用messageKey
的雜湊來確保相同messageKey
的所有訊息都轉到同一個生產者。這是最簡單也是最常用的方法。 - 噴塗(隨機負載平衡) :如果您不想控制哪些分割槽訊息,只需呼叫
new ProducerRecord(topicName, message)
以建立您的ProducerRecord
。在這種情況下,分割槽程式將以迴圈方式向所有分割槽傳送訊息,從而確保平衡的伺服器負載。
對Apache Kafka應用程式進行分割槽
對於第1部分中的簡單生產者/消費者示例,我們使用了 DefaultPartitioner
。現在我們將嘗試建立自定義分割槽程式。對於此示例,我們假設我們有一個零售網站,消費者可以使用該網站在世界任何地方訂購產品。根據使用情況,我們知道大多數消費者都在美國或印度。我們希望對我們的應用程式進行分割槽,以便將來自美國或印度的訂單傳送給各自的消費者,而來自其他任何地方的訂單將轉發給第三個消費者。
首先,我們將建立一個實現 org.apache.kafka.clients.producer.Partitioner
介面的 CountryPartitioner
。我們必須實現以下方法:
- 當我們使用配置屬性初始化類時,Kafka將呼叫 configure() 。此方法初始化特定於應用程式業務邏輯的函式,例如連線到資料庫。在這種情況下,我們需要一個相當通用的分割槽器作為屬性。然後,我們可以使用將訊息流對映到分割槽。將來我們可以使用這種格式來改變哪些國家/地區獲得自己的分割槽。
PartitionerMapcountryNameconfigProperties.put("partitions.0","USA")
- 該
Producer
為每個訊息呼叫 partition()方法 。在這種情況下,我們將使用它來讀取訊息並從訊息中解析國家/地區的名稱。如果國家的名稱在countryToPartitionMap
,它將返回儲存在Map
的partitionId
如果沒有,它將雜湊國家的值並使用它來計算它應該去哪個分割槽。 - 我們呼叫 close() 來關閉分割槽程式。使用此方法可確保在關閉期間清除初始化期間獲取的任何資源。
請注意,當Kafka呼叫 configure()
時,Kafka生成器會將我們為生成器配置的所有屬性傳遞給 Partitioner
類。我們必須只讀取那些以 partitions.
開頭的屬性,解析它們以獲取 partitionId
並存儲ID到 countryToPartitionMap
。
以下是我們的 Partitioner
介面自定義實現。
清單1. CountryPartitioner
public class CountryPartitioner implements Partitioner { private static Map<String,Integer> countryToPartitionMap; public void configure(Map<String, ?> configs) { System.out.println("Inside CountryPartitioner.configure " + configs); countryToPartitionMap = new HashMap<String, Integer>(); for(Map.Entry<String,?> entry: configs.entrySet()){ if(entry.getKey().startsWith("partitions.")){ String keyName = entry.getKey(); String value = (String)entry.getValue(); System.out.println( keyName.substring(11)); int paritionId = Integer.parseInt(keyName.substring(11)); countryToPartitionMap.put(value,paritionId); } } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.availablePartitionsForTopic(topic); String valueStr = (String)value; String countryName = ((String) value).split(":")[0]; if(countryToPartitionMap.containsKey(countryName)){ //If the country is mapped to particular partition return it return countryToPartitionMap.get(countryName); }else { //If no country is mapped to particular partition distribute between remaining partitions int noOfPartitions = cluster.topics().size(); returnvalue.hashCode()%noOfPartitions + countryToPartitionMap.size() ; } } public void close() {} }
Producer
清單2(下面)中的類與 第1部分中 的簡單生成器非常相似,其中兩個更改以粗體標記:
- 我們使用等於值
ProducerConfig.PARTITIONER_CLASS_CONFIG
的鍵設定config屬性,該值匹配我們CountryPartitioner
類的完全限定名。我們還設定countryName
為partitionId
,從而映射了我們想要傳遞給CountryPartitioner
的屬性。 - 我們將實現
org.apache.kafka.clients.producer.Callback
介面的類的例項作為producer.send()
方法的第二個引數傳遞。一旦成功釋出訊息(附加了RecordMetadata
物件),Kafka客戶端將呼叫onCompletion()
其方法。我們將能夠使用此物件來找出傳送訊息的分割槽,以及分配給已釋出訊息的偏移量。
清單2.分割槽生產者
public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName()); configProperties.put("partition.1","USA"); configProperties.put("partition.2","India"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, null, line); producer.send(rec, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset()); ; } }); line = in.nextLine(); } in.close(); producer.close(); } }
為消費者分配分割槽
Kafka伺服器保證僅將分割槽分配給一個消費者,從而保證訊息的消耗順序。您可以手動分配分割槽或自動分配分割槽。
如果您的業務邏輯需要更多控制,那麼您將需要手動分配分割槽。在這種情況下,您將使用 KafkaConsumer.assign(<listOfPartitions>)
將每個消費者感興趣的分割槽列表傳遞給Kakfa伺服器。
自動分配分割槽是預設和最常見的選擇。在這種情況下,Kafka伺服器將為每個使用者分配一個分割槽,並將重新分配分割槽以擴充套件新的使用者。
假設您正在建立一個包含三個分割槽的新topic。當您為新topic啟動第一個消費者時,Kafka會將所有三個分割槽分配給同一個消費者。如果您隨後啟動第二個消費者,Kafka將重新分配所有分割槽,將一個分割槽分配給第一個下發者,將剩餘的兩個分割槽分配給第二個消費者。如果新增第三個消費者,Kafka將再次重新分配分割槽,以便為每個消費者分配一個分割槽。最後,如果您啟動第四個和第五個消費者,那麼三個消費者將擁有一個分配的分割槽,但其他消費者將不會收到任何訊息。如果最初的三個分割槽之一出現故障,Kafka將使用相同的分割槽邏輯將該消費者的分割槽重新分配給其他消費者。
我們將為示例應用程式使用自動分配。我們的大部分消費者程式碼都與第1部分中的簡單消費者程式碼相同。唯一的區別是我們將一個例項 ConsumerRebalanceListener
作為第二個引數傳遞給我們的 KafkaConsumer.subscribe()
方法。Kafka每次為此消費者分配或撤銷分割槽時都會呼叫此類的方法。我們將覆蓋 ConsumerRebalanceListener
的 onPartitionsRevoked()
和 onPartitionsAssigned()
方法,並列印從此訂閱者分配或撤消的分割槽列表。
清單3.分割槽的使用者
private static class ConsumerThread extends Thread { private String topicName; private String groupId; private KafkaConsumer<String, String> kafkaConsumer; public ConsumerThread(String topicName, String groupId) { this.topicName = topicName; this.groupId = groupId; } public void run() { Properties configProperties = new Properties(); configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //Figure out where to start processing messages from kafkaConsumer = new KafkaConsumer<String, String>(configProperties); kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray())); } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray())); } }); //Start processing messages try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(record.value()); } } catch (WakeupException ex) { System.out.println("Exception caught " + ex.getMessage()); } finally { kafkaConsumer.close(); System.out.println("After closing KafkaConsumer"); } } public KafkaConsumer<String, String> getKafkaConsumer() { return this.kafkaConsumer; } }
測試您的Apache Kafka應用程式
我們已準備好執行並測試生產者/消費者應用程式的當前迭代。如前所述,您可以使用清單1到清單3中的程式碼,或者在GitHub上 下載完整的原始碼 。
- 通過呼叫:編譯並建立一個JAR
mvn compile assembly:single
。 - 建立一個以三個分割槽和一個複製因子命名的主題
part-demo
:<KAFKA_HOME>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic part-demo
- 啟動生產者:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer part-demo
- 啟動三個消費者,然後觀察控制檯以檢視每次啟動使用者的新例項時如何分配和撤消分割槽:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Consumer part-demo group1
- 在生產者控制檯中鍵入一些訊息,並驗證訊息是否路由到正確的使用者:
USA: First order India: First order USA: Second order France: First order
圖2顯示了分割槽主題中的生產者/消費者輸出。

能夠將單個主題劃分為多個部分是Kafka可擴充套件性的關鍵。通過分割槽,您可以水平擴充套件訊息傳遞基礎結構,同時還可以維護每個分割槽內的順序 接下來,我們將瞭解Kafka如何使用訊息偏移來跟蹤和管理複雜的訊息傳遞方案。
管理message偏移
我在第1部分中提到,每當生產者釋出訊息時,Kafka伺服器就會為該訊息分配一個偏移量。消費者能夠通過設定或重置訊息偏移來控制它想要消費的訊息。在開發消費者時,您有兩種管理偏移的選項:自動和手動。
兩種型別的偏移
當您在Kafka客戶端中啟動使用者時,它將讀取您的 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset)
配置值。如果該配置設定為 最早, 則消費者將以該topic可用的最小偏移量開始。在向Kafka提出的第一個請求中,消費者會說:給我這個分割槽中的所有訊息,其偏移量大於可用的最小值。它還將指定批量大小。Kafka伺服器將以指定大小的批量返回所有匹配的訊息。
消費者跟蹤它處理的最後一條訊息的偏移量,因此它將始終請求偏移量高於最後一個偏移量的訊息。當消費者正常執行時,此設定有效,但如果消費者崩潰,或者您想停止維護,會發生什麼?在這種情況下,您希望使用者記住上次處理的訊息的偏移量,以便它可以從第一個未處理的訊息開始。
為了確保訊息永續性,Kafka使用兩種型別的偏移: 當前偏移量 用於跟蹤消費者正常工作時消耗的訊息。該 偏移 還跟蹤最後的訊息抵消,但它傳送資訊到伺服器kafka永久儲存。
如果消費者由於某種原因而關閉或被關閉,它可以向Kafka伺服器查詢 最後提交的偏移量 並恢復訊息消費,就好像沒有丟失一樣。就其本身而言,Kafka broker將此資訊儲存在一個名為 __consumer_offsets
的topic中。此資料將複製到多個broker,以便broker不會丟失偏移量。
提交偏移資料
您可以選擇提交偏移資料的頻率。如果您經常提交,則會受到效能損失。另一方面,如果消費者確實失敗了,那麼重新處理和消費的訊息就會減少。您的另一個選擇是減少提交(以獲得更好的效能),但在發生故障時重新處理更多訊息。在任何一種情況下,消費者都有兩種提交偏移的選項:
- 自動提交 :您可以設定
auto.commit
為true並使用以毫秒為單位的值設定auto.commit.interval.ms
屬性。啟用此功能後,Kafka使用者將提交poll()
呼叫而收到的最後一條訊息的偏移量。該poll()
呼叫在auto.commit.interval.ms
後發出。 - 手動提交 :您可以隨時呼叫
KafkaConsumer
的commitSync()
或commitAsync()
方法。當您發出呼叫時,使用者將獲取在poll()
期間收到的最後一條訊息的偏移量並將其提交給Kafka伺服器。
手動偏移的三個用例
讓我們考慮三種使用情況,您不希望使用Kafka的預設偏移管理基礎架構。相反,您將手動確定要從哪個訊息開始。
- 從頭開始 :在此用例中,您將捕獲Kafka中的資料庫更改。第一份資料是完整資料; 此後,您只會獲得值已更改的列(更改的增量)。在這種情況下,您始終需要從頭開始閱讀topic中的所有訊息,以構建記錄的完整狀態。要解決這種情況,您可以將消費者配置為通過呼叫
kafkaConsumer.seekToBeginning(topicPartition)
方法從頭開始讀取。請記住,預設情況下,Kafka將刪除超過七天的訊息,因此您需要為此用例配置更高的log.retention.hours
值。 - 轉到最後 :現在讓我們假設您通過實時分析交易來構建股票推薦應用程式。最糟糕的情況發生,您的消費者應用程式崩潰。在這種情況下,你已經使用過了
kafkaConsumer.seekToEnd(topicPartition)
來配置偏移量以忽略停機期間的訊息。相反,消費者將開始處理重啟之時發生的訊息 - 從給定的偏移開始 :最後,假設您剛剛在生產環境中釋出了新版本的生產者。在觀看它產生一些訊息後,您意識到它正在生成錯誤訊息。你修復了生產者並重新開始。您不希望消費者使用這些錯誤訊息,因此您可以通過呼叫
kafkaConsumer.seek(topicPartition, startingOffset)
手動將偏移量設定為生成的第一個良好訊息。
消費者應用程式中的手動偏移
我們迄今為止開發的消費者程式碼每5秒自動提交一次記錄。現在讓我們更新消費者以獲取手動設定偏移消耗的第三個引數。
如果使用等於0的最後一個引數的值,則使用者將假定您要從頭開始,因此它將為每個分割槽呼叫一個 kafkaConsumer.seekToBeginning()
方法。如果傳遞值-1,則會假定您要忽略現有訊息,並且僅消費在重新啟動使用者後釋出的訊息。在這種情況下,它將為每個分割槽呼叫 kafkaConsumer.seekToEnd()
。最後,如果指定除0或-1以外的任何值,則會假定您已指定了消費者要從中開始的偏移量; 例如,如果您將第三個值傳遞為5,那麼在重新啟動時,使用者將使用偏移量大於5的訊息。為此,它將呼叫 kafkaConsumer.seek(<topicname>, <startingoffset>)
。
清單4.向使用者新增第三個引數
private static class ConsumerThread extends Thread{ private String topicName; private String groupId; private long startingOffset; private KafkaConsumer<String,String> kafkaConsumer; public ConsumerThread(String topicName, String groupId, long startingOffset){ this.topicName = topicName; this.groupId = groupId; this.startingOffset=startingOffset; } public void run() { Properties configProperties = new Properties(); configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123"); configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //Figure out where to start processing messages from kafkaConsumer = new KafkaConsumer<String, String>(configProperties); kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray())); } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray())); Iterator<TopicPartition> topicPartitionIterator = partitions.iterator(); while(topicPartitionIterator.hasNext()){ TopicPartition topicPartition = topicPartitionIterator.next(); System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) ); if(startingOffset ==0){ System.out.println("Setting offset to beginning"); kafkaConsumer.seekToBeginning(topicPartition); }else if(startingOffset == -1){ System.out.println("Setting it to the end "); kafkaConsumer.seekToEnd(topicPartition); }else { System.out.println("Resetting offset to " + startingOffset); kafkaConsumer.seek(topicPartition, startingOffset); } } } }); //Start processing messages try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } }catch(WakeupException ex){ System.out.println("Exception caught " + ex.getMessage()); }finally{ kafkaConsumer.close(); System.out.println("After closing KafkaConsumer"); } } public KafkaConsumer<String,String> getKafkaConsumer(){ return this.kafkaConsumer; } }
程式碼準備就緒後,您可以通過執行以下命令對其進行測試:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.offset.Consumer part-demo group1 0
Kafka客戶端應該列印偏移量為0的所有訊息,或者您可以更改最後一個引數的值以在訊息佇列中跳轉。
Apache Kafka中的消費者群體
傳統的訊息傳遞用例可以分為兩種主要型別:點對點和釋出 - 訂閱。在 點對點 場景中,一個消費者使用一條訊息。當訊息中繼銀行交易時,只有一個消費者應該通過更新銀行賬戶進行響應。在 釋出 - 訂閱 方案中,多個消費者將使用單個訊息但對其作出不同的響應。當Web伺服器出現故障時,您希望將警報傳送給程式設計為以不同方式響應的消費者。
佇列 是指點對點場景,其中訊息僅由一個消費者使用。 主題 是指釋出 - 訂閱方案,其中每個消費者都使用訊息。Kafka沒有為佇列和主題用例定義單獨的API; 相反,當您啟動消費者時,您需要指定 ConsumerConfig.GROUP_ID_CONFIG
屬性。
如果您對多個消費者使用相同的 GROUP_ID_CONFIG
訊息,Kafka將假設它們都是單個組的一部分,並且它將僅向一個消費者傳遞訊息。如果你在不同的 group.id
中啟動兩個消費者,Kafka將假設它們不相關,因此每個消費者將獲得它自己的訊息副本。
回想一下清單3中的分割槽使用者將 groupId
其作為第二個引數。現在我們將使用該 groupId
引數為消費者實現佇列和主題用例。
- 建立一個以
group-test
兩個分割槽命名的主題:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic group-test
- 啟動一個可用於將訊息釋出到
group-test
剛剛建立的主題的生產者:java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer group-test
- 啟動三個聽取釋出到
group-test
主題的訊息的消費者。使用group1
你的價值group id
。這將為您提供三個消費者group1
:java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group1
- 啟動第四個消費者,但這次改變了
group id
to 的值group2
。這將為您提供三個消費者group1
和一個消費者group2
:java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group2
- 返回生產者控制檯並開始鍵入訊息。您釋出的每條新訊息都應在
group2
消費者視窗中出現一次,並在三個group1
消費者視窗中出現一次,如圖3所示。

第2部分的結論
大資料訊息系統的早期用例需要批處理,例如執行夜間ETL過程或定期將資料從RDBMS移動到NoSQL資料儲存區。在過去幾年中,對實時處理的需求增加,特別是對於欺詐檢測和應急響應系統。Apache Kafka是為這些型別的實時場景而構建的。
Apache Kafka是一個很好的開源產品,但確實有一些限制; 例如,您無法在主題到達目標之前從主題內部查詢資料,也不能跨多個地理位置分散的群集複製資料。您可以將MapR Streams(商業產品)與Kafka API結合使用,以實現這些和其他更復雜的釋出 - 訂閱方案。
原文連結: https://www.javaworld.com/article/3066873/big-data-messaging-with-kafka-part-2.html