1. 程式人生 > >kafka的topic多分割槽的情況,如何保證跨區的訊息消費的順序性

kafka的topic多分割槽的情況,如何保證跨區的訊息消費的順序性

這個問題嚴格來說是肯定有的,kafka只能保證分割槽內的有序性。

下面是kafka作者Jay Kreps的blog中介紹kafka設計思想的一段話。

Each partition is a totally ordered log, but there is no global ordering between partitions (other than perhaps some wall-clock time you might include in your messages). The assignment of the messages to a particular partition is controllable by the writer, with most users choosing to partition by some kind of key (e.g. user id). Partitioning allows log appends to occur without co-ordination between shards and allows the throughput of the system to scale linearly with the Kafka cluster size.

針對部分訊息有序(message.key相同的message要保證消費順序)場景,可以在producer往kafka插入資料時控制,同一key分發到同一partition上面。

kafka原始碼如下,支援該方式

private[kafka]classDefaultPartitioner[T]extendsPartitioner[T]{
  privateval random = newjava.util.Random
  def partition(key: T, numPartitions: Int): Int = {
    if(key== null){
        println("key is null")
        random.nextInt(numPartitions)
    }
    else{
        println("key is "+ key + " hashcode is "+key.hashCode)
        math.abs(key.hashCode) % numPartitions
    }
  }
}

在kafka-storm中,如果one partition -> one consumer instance 的話,就沒這樣的問題,但失去了並行。

如果N1 partitions -> N2 consumer instances的話 ,

1)N1<N2,這種情況會造成部分consumer空轉,資源浪費。

2)N1>N2(N2>1),這種情況,每個kafka-spout例項會消費固定的1個或者幾個partition,msg不會被不同consumer重複消費。

3)N1=N2,這種情況,實際操作發現,1個consumer instance都對應消費1個partition。1個partition只會有1個consumer例項,否則需要加鎖等操作,這樣減少了消費控制的複雜性。

具體應用場景:

計算使用者在某個位置的滯留時間,日誌內容可以抽象成使用者ID、時間點、位置。

應用系統-》日誌檔案sftp伺服器-》資料採集層-》kafka-》storm實時資料清洗處理層-》Redis、Hbase-》定時任務、mapreduce

在整合測試期間,由於沒有實際的日誌,所以在採集層模擬往kafka插入資料(特別在傳送頻率模擬的很粗糙),發現在實時處理層,計算出來使用者在某個位置滯留時間計算出來為負數,原因如下,

1)採集層模擬不真實(同一使用者往kafka插入的位置的時間是隨機生成),但要考慮目前的日誌檔案sftp伺服器 或者 採集層 是否會有這種情況,如果有,可以從業務層面規避,過濾掉該條無效資料。

2)就是storm中tuple處理失敗,重發,kafka-storm中就使offset回到失敗的那個位置,但之前位置資訊可能已經快取到了redis(為了減少hbase訪問次數,使用者的最近一條位置資訊放在了redis中),這樣offset之後的所有訊息會重新被消費,這樣以來滯留時間為負數,可以過濾掉該條記錄,不存到redis中。 

  真實資料:U1 T1 A1->U1 T2 A2

  fail重發  :U1 T1 A1->U1 T2 A2 ->  前兩條都失敗,重發 -> U1 T1 A1(負數的滯留時間) -> U1 T2 A2

由於採用的是失敗重發,是at least once,如果是only once的話,就會沒有這樣的情況,

PS:一些原理性問題,可以參考“kafka消費原理”介紹。