1. 程式人生 > >kafka基本原理介紹,以及重新選舉,replica複製機制,isr等。

kafka基本原理介紹,以及重新選舉,replica複製機制,isr等。

最近做的專案,通過資料庫的log日誌將資料庫某些千萬量級的表(這些表需要聯表查詢)資料同步到elasticsearch中,以減輕資料庫的查詢壓力,其中以kafka作為訊息中介軟體,以下是做該專案過程中對kafka的一些整理。

一、中介軟體

中介軟體,用於業務對於資料的時效性要求並不是特別高,有削峰填谷、解耦之功效。特別是中介軟體可以實現傳送端和消費端的解耦,讓訊息的傳送端非同步傳送訊息,並迅速返回,可以極大提高系統整體的效能。中介軟體本質,就是傳送、儲存、消費,其實可以自己封裝一箇中間件,比如用併發包中的ArrayBlockingQueue,LinkedBlockingQueue,用put(T t)寫入資訊,用take(T t)拉取資訊,這兩個方法都有阻塞功能。FIFO佇列,其中LinkedBlockingQueue效率更高。

選取中介軟體時,根據業務對於功能、效能、可靠性、可用性、生態性的需要,選用流行中介軟體,可以藉助搜尋引擎很快解決Bug,並且流行中介軟體的版本更迭及時,可以更快的獲取更高的效能,和更多的功能。

二、kafka基本介紹

1、基本概念

需要了解producer,consumer,groupId,broker,topic,partition,segment的概念,如下圖。

2、版本名

kafka_2.10-0.8.2.jar,2.10是指Scala版本,0.8.2是指kafka版本。

3、核心功能

  • Producer API允許程式釋出資料流到一個到多個Kafka topic。
  • Consumer API允許程式訂閱一個到多個topic,並且進行消費。
  • Streams API允許程式作為一個數據流處理,將一個或多個topic中輸入的資料進行消費,並生產資料流到一個或多個topics中。
  • Connector API,可以通過Connector管理Kafka和另一個系統之間的資料複製,比如去捕獲關係型資料庫中的任意改變到一個表中。

4、topic介紹

topic(不同的業務資料,分流到不同的topic進行處理)

                                   \\

                                     \\

topic是基於zk建立的,實 \\ 際上同一topic下的partition是按如下分佈在各個伺服器上的(可以設定replicas的個數,此圖partition黑色為leader,紅色為 \\ 副本folower)。

                                         \    \\    /

                                              \||/

kafka對與zookeeper是強依賴的,是以zookeeper作為基礎的,即使不做叢集,也需要zk的支援。以下是kafka中必須要填寫的配置檔案,id為在zk中註冊的brokerid,後者為要註冊到的zookeeper的host和port。

broker.id=0
zookeeper.connect=localhost:2181

zk說白了,就是一個節點服務系統,至於用這個節點做什麼,做單活、開關鎖還是做檢測伺服器存活狀態,都是業務程式碼根據這個節點做的一些邏輯處理。以下是kafka預設在zk中的節點層級結構:

5、partition介紹

partion可以看作一個有序的佇列,裡面的資料是儲存在硬碟中的,追加式的。partition的作用就是提供分散式的擴充套件,一個topic可以有許多partions,多個partition可以並行處理資料,所以可以處理相當量的資料。只有partition的leader才會進行讀寫操作,folower僅進行復制,客戶端是感知不到的。下圖把kafka叢集看成一個kakfa服務,僅顯示leader。

1)offset概念

每一條資料都有一個offset,是每一條資料在該partition中的唯一標識。各個consumer控制和設定其在該partition下消費到offset位置,這樣下次可以以該offset位置開始進行消費。

各個consumer的offset位置預設是在某一個broker當中的topic中儲存的(為防止該broker宕掉無法獲取offset資訊,可以配置在每個broker中都進行儲存,配置檔案中配置)

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

2)replicas的同步時機。

假如有N個replicas,其中一個replica為leader,其他都為follower,leader處理partition的所有讀寫請求,於此同時,follower會被動定期的去複製leader上的資料。

3)ISR介紹

leader會追蹤和維護ISR中所有follower的滯後狀態。如果滯後太多(數量滯後和時間滯後兩個維度,replica.lag.time.max.ms和replica.lag.max.message可配置),leader會把該replica從ISR中移除。被移除ISR的replica一直在追趕leader。如下圖,leader寫入資料後並不會commit,只有ISR列表中的所有folower同步之後才會commit,把滯後的follower移除ISR主要是避免寫訊息延遲。設定ISR主要是為了broker宕掉之後,重新選舉partition的leader從ISR列表中選擇。

滯後情況:新增副本,GC掛起,follower失效,I/O瓶頸。

6、producer介紹

send(String topic, Integer partition, Long timestamp, K key, V data)
  • producer在傳送訊息的時候,必須指定topic和data,可以選擇指定partion、key、timestamp,其中時間戳有兩種方式,CreateTime和LogAppendTime,前者是客戶端設定時間,後者是broker在訊息寫入log時設定的時間。如果為null,用的是System.currentTimeMillis()。如果同時不指定partition和key,那麼就用round-bin決定傳送到哪個partition。
  • 客戶端會定時的取Broker的topic、partition、replicas等元資料資訊,producer持有kafka節點的metadata資訊,通過該資訊建立ProducerPool,每次傳送資訊會根據要傳送哪個Partition,來選擇相應的Producer例項,Rpc連線。

7、consumer介紹

以下針對springBoot整合的kafka

@KafkaListener(topics = {"cache-music-user"},groupId="zwhUser",containerFactory = "batchAbleFactory")

public void consumeBatch(List<ConsumerRecord<String,String>> recordList, Acknowledgment acknowledgment) throws InterruptedException {

        ...

        方法體

        ...

}
  • consumer如何知道自己應該拉取哪一個partition。cordinator(某一個Kafka的broker)在分配consumer的時候,會選舉consumer leader,後者分配每一個consumer要連線的broker,topic,partition,然後上報cordinator。然後consumer會根據自己被分配的partion去拉取資料。
  • 批量讀取和單資料讀取,ack機制。
  • 如果poll()時間超時,那麼broker會認為consumer掛掉了,會踢掉該consumer。cordinator重新分配consumer。有時超時會拋異常,不過也會重新分配consumer。
  • consumer的groupId機制。對於一個groupId中的consumer來說,一個partition只能由一個consumer來消費。即不可能多個consumer消費1個partition。如下:

consumer可以在不同的機器中。

三、延伸

1、kafka重新選舉

  • KafkaController的作用。Kafka叢集中多個broker,有一個會被選舉為controller leader,負責管理整個叢集中分割槽和副本的狀態,比如partition的leader 副本故障,由controller 負責為該partition重新選舉新的leader 副本;當檢測到ISR列表發生變化,有controller通知叢集中所有broker更新其MetadataCache資訊;或者增加某個topic分割槽的時候也會由controller管理分割槽的重新分配工作
  • KafkaController建立節點的方式去選舉,作為leader,任何follower掛了,zk會感知到並通過Controller註冊的Wather去通知Controller去重新選舉。而leader掛了,zk會感知到,會通過Wather機制通知每一個broker去競爭Master。而ReplicaManager每個broker都有,是接受Contrloller的請求,對本服務上的partition進行管理的。

2、效率高的原因

因為kafka的資料都是儲存在硬碟中,甚至有的公司將kafka其作為資料庫使用,既然資料是基於硬碟的,那麼為何kafka還是能夠擁有如此高的吞吐量呢?

1)硬碟的索引功能。二分查詢法。

分割槽:找到響應的分割槽

分段:根據檔案segment的命名可以確認要查詢的offset或timestamp在哪個檔案中。

稀疏索引:快速確定要找的offset在哪個記憶體地址的附近。

2)I/O優化

普通程式I/O需要把Disk中的資訊複製到系統環境記憶體(步驟1),再複製到kafka應用環境記憶體(步驟2),然後步驟3,步驟4到Socket通過網路發出,重複複製文字,I/O消耗大。

kafka則不一樣:

3、kafka和rabbitMq的對比。

kafka是一種高吞吐量的分散式釋出訂閱訊息系統。和rabbitMq各佔半臂江山。以下是對比:

kafka

rabbitMq

書寫語言

Java和Scala

Erlang

訊息協議

自定義通訊協議

AMQP/MQTT/STOMP等協議

訊息過濾

topic和partition進行

交換機路由

訊息堆積

磁碟式堆積

記憶體式堆積

訊息傳遞模式

典型的Pub/Sub模式

典型的P2P模式

消費模式

典型的Pull

Push+Pull

訊息回朔

通過offset和timestamp

消費即刪除

流量控制

對producer和consumer進行主動設定

Credit—Based演算法,作用producer

訊息順序

支援單分割槽級別的順序性

單執行緒傳送、消費

QPs

單機維持數十萬,甚至達到百萬

單機萬級別

效能來說,kafka的吞吐量較大。kafka易於向外擴充套件,所有的producer、broker、consumer無需停機都可以即刻擴充套件機器。資訊將全部的資訊持久化到硬碟上,生產和消費互不影響,很靈活。功能來說,kafka適用於日誌,事實上kafka本身就是LinkIn公司開發用於日誌系統的,所以其檔案叫做log。使用者跟蹤管道,對使用者網頁行為的記錄跟蹤,用於離線資料分析或者做報表。大資料分析。