1. 程式人生 > >kafka partition(分割槽)與 group kafka partition(分割槽)與 group

kafka partition(分割槽)與 group kafka partition(分割槽)與 group

kafka partition(分割槽)與 group

 

 一、

1、原理圖

2、原理描述

一個topic 可以配置幾個partition,produce傳送的訊息分發到不同的partition中,consumer接受資料的時候是按照group來接受,kafka確保每個partition只能同一個group中的同一個consumer消費,如果想要重複消費,那麼需要其他的組來消費。Zookeerper中儲存這每個topic下的每個partition在每個group中消費的offset 
新版kafka把這個offsert儲存到了一個__consumer_offsert的topic下 
這個__consumer_offsert 有50個分割槽,通過將group的id雜湊值%50的值來確定要儲存到那一個分割槽.  這樣也是為了考慮到zookeeper不擅長大量讀寫的原因。
所以,如果要一個group用幾個consumer來同時讀取的話,需要多執行緒來讀取,一個執行緒相當於一個consumer例項。當consumer的數量大於分割槽的數量的時候,有的consumer執行緒會讀取不到資料。 
假設一個topic test 被groupA消費了,現在啟動另外一個新的groupB來消費test,預設test-groupB的offset不是0,而是沒有新建立,除非當test有資料的時候,groupB會收到該資料,該條資料也是第一條資料,groupB的offset也是剛初始化的ofsert, 除非用顯式的用–from-beginnging 來獲取從0開始資料 

3、檢視topic-group的offsert 

位置:zookeeper 
路徑:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions 
在zookeeper的topic中有一個特殊的topic __consumer_offserts 
計算方法:(放入哪個partitions)

int hashCode = Math.abs("ttt".hashCode());

int partition = hashCode % 50;

先計算group的hashCode,再除以分割槽數(50),可以得到partition的值 

使用命令檢視: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

4.引數 
auto.offset.reset:預設值為largest,代表最新的訊息,smallest代表從最早的訊息開始讀取,當consumer剛開始建立的時候沒有offset這種情況,如果設定了largest,則為當收到最新的一條訊息的時候開始記錄offsert,若設定為smalert,那麼會從頭開始讀partition

  二、 1、Topic      Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條訊息放進哪個queue裡。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個資料夾,該資料夾下儲存這個Partition的所有訊息和索引檔案。若建立topic1和topic2兩個topic,且分別有13個和19個分割槽,則整個叢集上會相應會生成共32個資料夾(本文所用叢集共8個節點,此處topic1和topic2 replication-factor均為1),如下圖所示。 2、對於傳統的message queue而言,一般會刪除已經被消費的訊息,而Kafka叢集會保留所有的訊息,無論其被消費與否。當然,因為磁碟限制,不可能永久保留所有資料(實際上也沒必要),      因此Kafka提供兩種策略刪除舊資料。一是基於時間,二是基於Partition檔案大小。      例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一週前的資料,也可在Partition檔案超過1GB時刪除舊資料,配置如下所示。    這裡要注意,因為Kafka讀取特定訊息的時間複雜度為O(1),即與檔案大小無關,所以這裡刪除過期檔案與提高Kafka效能無關。選擇怎樣的刪除策略只與磁碟以及具體的需求有關。另外,Kafka會為每一個Consumer Group保留一些metadata資訊——當前消費的訊息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條訊息後遞增該offset。當然,Consumer也可將offset設成一個較小的值,重新消費一些訊息。因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些訊息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條訊息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。    3、producer Producer傳送訊息到broker時,會根據Paritition機制選擇將其儲存到哪一個Partition。如果Partition機制設定合理,所有訊息可以均勻分佈到不同的Partition裡,這樣就實現了負載均衡。如果一個Topic對應一個檔案,那這個檔案所在的機器I/O將會成為這個Topic的效能瓶頸,而有了Partition後,不同的訊息可以並行寫入不同broker的不同Partition裡,極大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的預設Partition數量,也可在建立Topic時通過引數指定,同時也可以在Topic建立之後通過Kafka提供的工具修改。   在傳送一條訊息時,可以指定這條訊息的key,Producer根據這個key和Partition機制來判斷應該將這條訊息傳送到哪個Parition。Paritition機制可以通過指定Producer的paritition. class這一引數來指定,該class必須實現kafka.producer.Partitioner介面。本例中如果key可以被解析為整數則將對應的整數與Partition總數取餘,該訊息會被髮送到該數對應的Partition。(每個Parition都會有個序號,序號從0開始)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import  kafka.producer.Partitioner; import  kafka.utils.VerifiableProperties;   public  class  JasonPartitioner<T>  implements  Partitioner {        public  JasonPartitioner(VerifiableProperties verifiableProperties) {}        @Override      public  int  partition(Object key,  int  numPartitions) {          try  {              int  partitionNum = Integer.parseInt((String) key);              return  Math.abs(Integer.parseInt((String) key) % numPartitions);          catch  (Exception e) {              return  Math.abs(key.hashCode() % numPartitions);          }      } }

  如果將上例中的類作為partition.class,並通過如下程式碼傳送20條訊息(key分別為0,1,2,3)至topic3(包含4個Partition)。

1 2 3 4 5 6 7 8 9 10 public  void  sendMessage()  throws  InterruptedException{    for ( int  i =  1 ; i <=  5 ; i++){         List messageList =  new  ArrayList<KeyedMessage<String, String>>();          for ( int  j =  0 ; j <  4 ; j++){             messageList.add( new  KeyedMessage<String, String>( "topic2" , j+ "" "The "  + i +  " message for key "  + j));         }         producer.send(messageList);      }   producer.close(); }

  則key相同的訊息會被髮送並存儲到同一個partition裡,而且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是通過Java程式呼叫Consumer後打印出的訊息列表。

4、consumer group   (本節所有描述都是基於Consumer hight level API而非low level API)。

     使用Consumer high level API時,同一Topic的一條訊息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一訊息。

這是Kafka用來實現一個Topic訊息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group裡。用Consumer Group還可以將Consumer進行自由的分組而不需要多次傳送訊息到不同的Topic。

實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對訊息進行實時線上處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將資料實時備份到另一個數據中心,只需要保證這三個操作所使用的Consumer屬於不同的Consumer Group即可。

下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先建立一個Topic (名為topic1,包含3個Partition),然後建立一個屬於group1的Consumer例項,並建立三個屬於group2的Consumer例項,最後通過Producer向topic1傳送key分別為1,2,3的訊息。結果發現屬於group1的Consumer收到了所有的這三條訊息,同時group2中的3個Consumer分別收到了key為1,2,3的訊息。

 一、

1、原理圖

2、原理描述

一個topic 可以配置幾個partition,produce傳送的訊息分發到不同的partition中,consumer接受資料的時候是按照group來接受,kafka確保每個partition只能同一個group中的同一個consumer消費,如果想要重複消費,那麼需要其他的組來消費。Zookeerper中儲存這每個topic下的每個partition在每個group中消費的offset 
新版kafka把這個offsert儲存到了一個__consumer_offsert的topic下 
這個__consumer_offsert 有50個分割槽,通過將group的id雜湊值%50的值來確定要儲存到那一個分割槽.  這樣也是為了考慮到zookeeper不擅長大量讀寫的原因。
所以,如果要一個group用幾個consumer來同時讀取的話,需要多執行緒來讀取,一個執行緒相當於一個consumer例項。當consumer的數量大於分割槽的數量的時候,有的consumer執行緒會讀取不到資料。 
假設一個topic test 被groupA消費了,現在啟動另外一個新的groupB來消費test,預設test-groupB的offset不是0,而是沒有新建立,除非當test有資料的時候,groupB會收到該資料,該條資料也是第一條資料,groupB的offset也是剛初始化的ofsert, 除非用顯式的用–from-beginnging 來獲取從0開始資料 

3、檢視topic-group的offsert 

位置:zookeeper 
路徑:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions 
在zookeeper的topic中有一個特殊的topic __consumer_offserts 
計算方法:(放入哪個partitions)

int hashCode = Math.abs("ttt".hashCode());

int partition = hashCode % 50;

先計算group的hashCode,再除以分割槽數(50),可以得到partition的值 

使用命令檢視: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

4.引數 
auto.offset.reset:預設值為largest,代表最新的訊息,smallest代表從最早的訊息開始讀取,當consumer剛開始建立的時候沒有offset這種情況,如果設定了largest,則為當收到最新的一條訊息的時候開始記錄offsert,若設定為smalert,那麼會從頭開始讀partition

  二、 1、Topic      Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條訊息放進哪個queue裡。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個資料夾,該資料夾下儲存這個Partition的所有訊息和索引檔案。若建立topic1和topic2兩個topic,且分別有13個和19個分割槽,則整個叢集上會相應會生成共32個資料夾(本文所用叢集共8個節點,此處topic1和topic2 replication-factor均為1),如下圖所示。 2、對於傳統的message queue而言,一般會刪除已經被消費的訊息,而Kafka叢集會保留所有的訊息,無論其被消費與否。當然,因為磁碟限制,不可能永久保留所有資料(實際上也沒必要),      因此Kafka提供兩種策略刪除舊資料。一是基於時間,二是基於Partition檔案大小。      例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一週前的資料,也可在Partition檔案超過1GB時刪除舊資料,配置如下所示。    這裡要注意,因為Kafka讀取特定訊息的時間複雜度為O(1),即與檔案大小無關,所以這裡刪除過期檔案與提高Kafka效能無關。選擇怎樣的刪除策略只與磁碟以及具體的需求有關。另外,Kafka會為每一個Consumer Group保留一些metadata資訊——當前消費的訊息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條訊息後遞增該offset。當然,Consumer也可將offset設成一個較小的值,重新消費一些訊息。因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些訊息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條訊息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。    3、producer Producer傳送訊息到broker時,會根據Paritition機制選擇將其儲存到哪一個Partition。如果Partition機制設定合理,所有訊息可以均勻分佈到不同的Partition裡,這樣就實現了負載均衡。如果一個Topic對應一個檔案,那這個檔案所在的機器I/O將會成為這個Topic的效能瓶頸,而有了Partition後,不同的訊息可以並行寫入不同broker的不同Partition裡,極大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的預設Partition數量,也可在建立Topic時通過引數指定,同時也可以在Topic建立之後通過Kafka提供的工具修改。   在傳送一條訊息時,可以指定這條訊息的key,Producer根據這個key和Partition機制來判斷應該將這條訊息傳送到哪個Parition。Paritition機制可以通過指定Producer的paritition. class這一引數來指定,該class必須實現kafka.producer.Partitioner介面。本例中如果key可以被解析為整數則將對應的整數與Partition總數取餘,該訊息會被髮送到該數對應的Partition。(每個Parition都會有個序號,序號從0開始)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import  kafka.producer.Partitioner; import  kafka.utils.VerifiableProperties;   public  class  JasonPartitioner<T>  implements  Partitioner {        public  JasonPartitioner(VerifiableProperties verifiableProperties) {}        @Override      public  int  partition(Object key,  int  numPartitions) {          try  {              int  partitionNum = Integer.parseInt((String) key);              return  Math.abs(Integer.parseInt((String) key) % numPartitions);          catch  (Exception e) {              return  Math.abs(key.hashCode() % numPartitions);          }      } }

  如果將上例中的類作為partition.class,並通過如下程式碼傳送20條訊息(key分別為0,1,2,3)至topic3(包含4個Partition)。

1 2 3 4 5 6 7 8 9 10 public  void  sendMessage()  throws  InterruptedException{    for ( int  i =  1 ; i <=  5 ; i++){         List messageList =  new  ArrayList<KeyedMessage<String, String>>();          for ( int  j =  0 ; j <  4 ; j++){             messageList.add( new  KeyedMessage<String, String>( "topic2" , j+ "" "The "  + i +  " message for key "  + j));         }         producer.send(messageList);      }   producer.close(); }

  則key相同的訊息會被髮送並存儲到同一個partition裡,而且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是通過Java程式呼叫Consumer後打印出的訊息列表。

4、consumer group   (本節所有描述都是基於Consumer hight level API而非low level API)。

     使用Consumer high level API時,同一Topic的一條訊息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一訊息。

這是Kafka用來實現一個Topic訊息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group裡。用Consumer Group還可以將Consumer進行自由的分組而不需要多次傳送訊息到不同的Topic。

實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對訊息進行實時線上處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將資料實時備份到另一個數據中心,只需要保證這三個操作所使用的Consumer屬於不同的Consumer Group即可。

下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先建立一個Topic (名為topic1,包含3個Partition),然後建立一個屬於group1的Consumer例項,並建立三個屬於group2的Consumer例項,最後通過Producer向topic1傳送key分別為1,2,3的訊息。結果發現屬於group1的Consumer收到了所有的這三條訊息,同時group2中的3個Consumer分別收到了key為1,2,3的訊息。