1. 程式人生 > >kafka 從分割槽任意位置、分割槽開頭、分割槽末尾開始消費資料

kafka 從分割槽任意位置、分割槽開頭、分割槽末尾開始消費資料

最近就kafka消費者消費資料時,消費者提交的offset與同事們有一些分歧和討論,這裡記錄一下自己的研究。

我們知道redis和kafka都可以作為訊息佇列使用,都可以完成釋出訂閱功能,但是kafka相較於redis可以實現訂閱訊息的儲存,可以實現訂閱訊息的任意位置消費,更重要的時kafka訂閱訊息是可以儲存到磁碟上的,而redis訂閱訊息是無法儲存磁碟的。

(1)消費者消費資料時加入一個消費者分組之後,可以通過 subscribe函式訂閱某個topic,這時這個消費者進入brokers的group management管理機制,同一個分片只能被一個分組中的消費者消費,如果同一個分片希望被多個消費者消費,需要將多個消費者放入到不同的消費者分組中。

 //訂閱指定的topic
consumer.subscribe(Arrays.asList(topic));
(2)還有一種消費資料的方式是可以通過assign函式指定要消費的分割槽資料,這種方式可以指定從分割槽的任意位置開始消費資料,當然這種
//消費者指定要消費的分割槽,指定分割槽之後消費者崩潰之後 不會引發分割槽reblance
consumer.assign(list);
消費資料的方式,如果消費者奔潰之後,不會引發分割槽reblance,也就是說assign的consumer不會擁有kafka的group management機制。

我們上面說過,同一個分片只能由消費者分組中的同一個消費者進行消費,假設當消費者A使用assign指定分割槽進行消費時,如果這時消費者A使用的分組group B,是通過subscribe訂閱了這個主題的分片時,由於消費者A不加入group management,它相當於一個獨立的臨時消費者,這時消費者A也是可以正常消費的,看起來就是一個分片被一個消費者組中的多個消費者消費一樣。

(3)我們還可以配置如下屬性auto.offset.reset來,設定消費者從分割槽的開頭或者末尾進行消費資料。當然這也是有條件的。

 //一般配置earliest 或者latest 值
props.put("auto.offset.reset", "latest");

我把上述三種情況的消費者不同使用方式下,消費者提交offset的情況進行了歸總和說明:


早在kafka0.8.2.2版本的時候,kafka已經支援訊息offset存在brokers中,只不過預設是將offset儲存到zookeeper中。kafka現在最新發布的版本都是預設將資料儲存到brokers中。我的程式碼示例是使用了kafka0.10.0.0版本,當我們這裡通過assign函式分配指定的分割槽時


下面是我的測試程式碼,有興趣的同學可以檢視和驗證上述結論:

/**
 * 
 * @author yujie.wang
 * kafka生產者示例程式碼
 */
public class Producer_Sample {
	//kafka叢集機器
	private static final String KAFKA_HOSTS = "10.4.30.151:9092,10.4.30.151:9093,10.4.30.151:9094";
	//topic名稱
	private static final String TOPIC = "my-replicated-topic_2";
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Producer_Sample producer = new Producer_Sample();
		producer.producer_send(TOPIC);
		System.out.println("end");
	}
	
	/**
	 * 生產者生產資料
	 * 傳送訊息是非同步進行,一旦訊息被儲存到分割槽快取中,send方法就返回
	 * 一旦訊息被接收 就會呼叫callBack
	 * @param topic
	 */
	public void producer_send(String topic){
		Properties props = new Properties();
		//kafka叢集機器
		props.put("bootstrap.servers", KAFKA_HOSTS);
		//生產者傳送的資料需要等待主分片和其副本都儲存才發回確認訊息
		props.put("acks", "all");
		//生產者傳送失敗後的確認訊息
		props.put("retries", 0);
		//生產者 每個分割槽快取大小 16K
		props.put("batch.size", 16384);
		//生產者傳送分割槽快取中資料前停留時間
		props.put("linger.ms", 1);
		//生產者可用快取總量大小 32M
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		 
		Producer<String, String> producer = new KafkaProducer<String,String>(props);
		for(int i = 220; i < 230; i++){
			//傳送訊息是非同步進行,一旦訊息被儲存到分割槽快取中,send方法就返回
		    // producer.send(new ProducerRecord<String, String>("my-replicated-topic_1", Integer.toString(i), Integer.toString(i)));
		    producer.send(new ProducerRecord<String, String>(topic, "call___"+Integer.toString(i+20), "call___"+Integer.toString(i)),
		    		new Call());
		    System.out.println("send return I: "+ i);
		}

		producer.close();
	}

	/**
	 *訊息被儲存之後的回撥方法
	 */
	class Call implements Callback{

		@Override
		public void onCompletion(RecordMetadata recordmetadata,
				Exception exception) {
			// TODO Auto-generated method stub
			System.out.println("callBack: "+ recordmetadata.checksum() + " recordmetadata content : "+recordmetadata.toString());
		}
		
	}
}

/**
 * @author yujie.wang
 * kafka消費者示例,包含隨機位置消費和最多一次消費方式
 * 消費者提交消費資料offset 分為自動提交和手動控制提交
 * 
 * 這份程式碼示例中包含了 多種從kafka的任意位置獲取資料的方式
 */
public class Consumer_Sample {

	//kafka叢集機器
	private static final String KAFKA_HOSTS = "10.4.30.151:9092,10.4.30.151:9093,10.4.30.151:9094";
	//topic名稱
	private static final String TOPIC = "my-replicated-topic_2";
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Consumer_Sample consumer = new Consumer_Sample();
		//從分割槽的末尾 或者已存在groupid的請情況下從未消費位置開始消費資料
		consumer.consumerSubscribe("true", TOPIC);
		// 通過實現ConsumerRebalanceListener介面 進而時間任意位置的消費
		consumer.consumerSubscribeImplListener("true", TOPIC);
		//從指定的分割槽  開始位置seekToBeginning 或者任意位置seek消費資料
		consumer.consumerAssin("true", TOPIC);
		//通過配置屬性auto.offset.reset 來設定消費者從分割槽開頭或者末尾進行消費,但是需要使用一定條件的group Id
		consumer.consumerAutoOffsetReset("true", TOPIC);
		System.out.println("consumer end");
	}
	
	
	/**
	 * 直接通過訂閱一個指定分割槽來消費資料
	 * (1)如果該groupId消費者分組下 有消費者提交過offset,則從 當前提交的offset位置開始消費資料
	 * (2)如果該groupId消費者分組下 沒有有消費者提交過offset,則從 當前log新增的最後位置(也就是資料的末尾)開始消費資料
	 * @param isAutoCommitBool
	 * @param topic
	 */
	public void consumerSubscribe(final String isAutoCommitBool, final String topic){
		 Properties props = new Properties();
		 //配置kafka叢集機器
		 props.put("bootstrap.servers", KAFKA_HOSTS);
		 //消費者分組
		 props.put("group.id", "yujie37");
		 //這裡設定 消費者自動提交已消費訊息的offset
		 props.put("enable.auto.commit", isAutoCommitBool);
		 // 設定自動提交的時間間隔為1000毫秒
		 props.put("auto.commit.interval.ms", "1000");
		 // 設定每次poll的最大資料個數
		 props.put("max.poll.records", 5);
		 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		 //訂閱topic
		 consumer.subscribe(Arrays.asList(topic));
		 List<PartitionInfo> parList = consumer.partitionsFor(topic);

		 //打印出分割槽資訊
		 printPartition(parList);

		 //消費資料
		 while (true) {
		     ConsumerRecords<String, String> records = consumer.poll(5000);
		     System.out.println("topic: "+topic + " pool return records size: "+ records.count());
		     for (ConsumerRecord<String, String> record : records){
			 	 System.out.println(record.toString());
			 	  //手動提交已消費資料的offset
			 	 if("false".equalsIgnoreCase(isAutoCommitBool)){
			 		consumer.commitSync();
			 	 }
			 	
		     }
		   
		 }
	}
	
	
	/**
	 * 
	 * @param isAutoCommitBool true 開啟自動提交offset;false 不開啟
	 * @param topic
	 * (1)如果該groupId消費者分組下 有消費者提交過offset,則從 當前提交的offset位置開始消費資料
	 * (2)如果該groupId消費者分組下 沒有有消費者提交過offset,則從 當前log新增的最後位置(也就是資料的末尾)開始消費資料
	 * 
	 * 注意如果enable.auto.commit 設定為false,如果消費完資料沒有提交已消費資料的offset,
	 * 則會出現重複消費資料的情況
	 * 
	 * 通過實現ConsumerRebalanceListener介面中的onPartitionsAssigned方法,並在其中呼叫消費者的seek或者seekToBeginning
	 * 方法定位分割槽的任意位置或者開頭位置
	 */
	public void consumerSubscribeImplListener(final String isAutoCommitBool, final String topic){
		 Properties props = new Properties();
		 //配置kafka叢集機器
		 props.put("bootstrap.servers", KAFKA_HOSTS);
		 //消費者分組
		 props.put("group.id", "yujie26");
		 //這裡設定 消費者自動提交已消費訊息的offset
		 props.put("enable.auto.commit", isAutoCommitBool);
		 // 設定自動提交的時間間隔為1000毫秒
		 props.put("auto.commit.interval.ms", "1000");
		 // 設定每次poll的最大資料個數
		 props.put("max.poll.records", 5);
		 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		 //訂閱topic,並實現ConsumerRebalanceListener
		 consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener(){
			@Override
			public void onPartitionsRevoked(//分割槽撤銷時,消費者可以向該分割槽提交自己當前的offset
					Collection<TopicPartition> collection) {
				// TODO Auto-generated method stub
				if("false".equalsIgnoreCase(isAutoCommitBool)){
					//consumer.commitSync();
				}
			}

			@Override
			public void onPartitionsAssigned(//當分割槽分配給消費者時,消費者可以通過該方法重新定位需要消費的資料位置
					Collection<TopicPartition> collection) {
				// TODO Auto-generated method stub
				//將消費者定位到各個分割槽的開始位置進行消費
		/*		consumer.seekToBeginning(collection);
				System.out.println("seek beg");*/
			
				Iterator it = collection.iterator();
				while(it.hasNext()){
					//將消費者定位到指定分割槽的指定位置7進行消費
					consumer.seek((TopicPartition)it.next(), 7);
				}
				
			}
		 });
		 while (true) {
		     ConsumerRecords<String, String> records = consumer.poll(5000);
		     System.out.println("topic: "+topic + "pool return records size: "+ records.count());
		     for (ConsumerRecord<String, String> record : records){
			 	 System.out.println(record.toString());
			 	  //手動提交已消費資料的offset
			 	 if("false".equalsIgnoreCase(isAutoCommitBool)){
			 		consumer.commitSync();
			 	 }
			 	
		     }
		   
		 }
	}
	
	
	/**
	 * 
	 * @param isAutoCommitBool true 開啟自動提交offset;false 不開啟
	 * @param topic
	 * 如果groupId之前存在 , 則從之前提交的最後消費資料的offset處繼續開始消費資料
	 * 如果groupId之前不存在,則從當前分割槽的最後位置開始消費
	 * 
	 * 注意如果enable.auto.commit 設定為false,如果消費完資料沒有提交已消費資料的offset,
	 * 則會出現重複消費資料的情況
	 */
	public void consumerAutoOffsetReset(final String isAutoCommitBool, final String topic){
		 Properties props = new Properties();
		 //配置kafka叢集機器
		 props.put("bootstrap.servers", KAFKA_HOSTS);
		 //消費者分組
		 props.put("group.id", "yujie32");
		 //這裡設定 消費者自動提交已消費訊息的offset
		 props.put("enable.auto.commit", isAutoCommitBool);
		 // 設定自動提交的時間間隔為1000毫秒
		 props.put("auto.commit.interval.ms", "1000");
		 // 設定每次poll的最大資料個數
		 props.put("max.poll.records", 5);
		 //設定使用最開始的offset偏移量為該group.id的最早。如果不設定,則會是latest即該topic最新一個訊息的offset
	     //如果採用latest,消費者只能得道其啟動後,生產者生產的訊息
		 //一般配置earliest 或者latest 值
	     props.put("auto.offset.reset", "latest");
		 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		 //訂閱topic,並實現ConsumerRebalanceListener
		 consumer.subscribe(Arrays.asList(topic));
		 while (true) {
		     ConsumerRecords<String, String> records = consumer.poll(5000);
		     System.out.println("topic: "+topic + "pool return records size: "+ records.count());
		     for (ConsumerRecord<String, String> record : records){
			 	 System.out.println(record.toString());
			 	  //手動提交已消費資料的offset
			 	 if("false".equalsIgnoreCase(isAutoCommitBool)){
			 		consumer.commitSync();
			 	 }
			 	
		     }
		   
		 }
	}

	
	/**
	 * 通過assign分配的分割槽,消費者發生故障 Server端不會觸發分割槽重平衡(即使該消費者共享某個已有的groupId),每個消費者都是獨立工作的
	 * 為了避免offset提交衝突,需要確保每個消費者都有唯一的groupId
	 * 從指定的分割槽的開頭開始消費資料
	 * @param isAutoCommitBool true 開啟自動提交offset;false 不開啟
	 * @param topic
	 */
	public void consumerAssin(String isAutoCommitBool,String topic){
		 Properties props = new Properties();
		 //配置kafka叢集機器
		 props.put("bootstrap.servers", KAFKA_HOSTS);
		 //消費者分組
		 props.put("group.id", "yujie35");
		 //這裡設定 消費者自動提交已消費訊息的offset
		 props.put("enable.auto.commit", isAutoCommitBool);
		 // 設定自動提交的時間間隔為1000毫秒
		 props.put("auto.commit.interval.ms", "1000");
		 // 設定每次poll的最大資料個數
		 props.put("max.poll.records", 5);
		 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		 //獲得topic的所有分割槽
		 List<PartitionInfo> parList = consumer.partitionsFor(topic);
		 //打印出分割槽資訊
		 printPartition(parList);
		 
		 List<TopicPartition> list = new ArrayList<TopicPartition>();
		 for(PartitionInfo par : parList){
			 TopicPartition partition = new TopicPartition(topic, par.partition());
			 list.add(partition);
		 }
		 //消費者指定要消費的分割槽,指定分割槽之後消費者崩潰之後 不會引發分割槽reblance
	     consumer.assign(list);

    	 //從list中所有分割槽的開頭開始消費資料,這個操作不改變已提交的消費資料的offset
 	    // consumer.seekToBeginning(list);
	 

	 /*    for(TopicPartition tpar:list ){
	    	 //consumer.seek(tpar, position);
	     } */
     

		 while (true) {
		     ConsumerRecords<String, String> records = consumer.poll(5000);
		     System.out.println("topic: "+topic + " pool return records size: "+ records.count());
		     for (ConsumerRecord<String, String> record : records){
			 	 System.out.println(record.toString());
			 	  //手動提交已消費資料的offset
			 	 if("false".equalsIgnoreCase(isAutoCommitBool)){
			 		consumer.commitSync();
			 	 }
			 	
		     }
		   
		 }
	}
	

	
	

	
	public void printPartition(List<PartitionInfo> parList){
		for(PartitionInfo p : parList){
			System.out.println(p.toString());
		}
	}
	
	/**
	 * 單獨處理每個分割槽中的資料,處理完了之後非同步提交offset,注意提交的offset是程式將要讀取的下一條訊息的offset
	 * @param consumer
	 */
	public void handlerData(KafkaConsumer<String, String> consumer){
		boolean running = true;
		try {
	         while(running) {
	             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
	             for (TopicPartition partition : records.partitions()) {
	                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
	                 for (ConsumerRecord<String, String> record : partitionRecords) {
	                     System.out.println(record.offset() + ": " + record.value());
	                 }
	                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
	                 //注意提交的offset是程式將要讀取的下一條訊息的offset
	                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
	             }
	         }
	     } finally {
	       consumer.close();
	     }
	}
	
	/**
	 * 關閉消費者
	 * @param consumer
	 */
	public void closeConsumer(KafkaConsumer<String, String> consumer){
		if(consumer != null){
			consumer.close();
		}
	}

}