1. 程式人生 > >真的,Kafka 入門一篇文章就夠了

真的,Kafka 入門一篇文章就夠了

初識 Kafka

什麼是 Kafka

Kafka 是由 Linkedin 公司開發的,它是一個分散式的,支援多分割槽、多副本,基於 Zookeeper 的分散式訊息流平臺,它同時也是一款開源的基於釋出訂閱模式的訊息引擎系統。

Kafka 的基本術語

訊息:Kafka 中的資料單元被稱為訊息,也被稱為記錄,可以把它看作資料庫表中某一行的記錄。

批次:為了提高效率, 訊息會分批次寫入 Kafka,批次就代指的是一組訊息。

主題:訊息的種類稱為 主題(Topic),可以說一個主題代表了一類訊息。相當於是對訊息進行分類。主題就像是資料庫中的表。

分割槽:主題可以被分為若干個分割槽(partition),同一個主題中的分割槽可以不在一個機器上,有可能會部署在多個機器上,由此來實現 kafka 的伸縮性

,單一主題中的分割槽有序,但是無法保證主題中所有的分割槽有序

生產者: 向主題釋出訊息的客戶端應用程式稱為生產者(Producer),生產者用於持續不斷的向某個主題傳送訊息。

消費者:訂閱主題訊息的客戶端程式稱為消費者(Consumer),消費者用於處理生產者產生的訊息。

消費者群組:生產者與消費者的關係就如同餐廳中的廚師和顧客之間的關係一樣,一個廚師對應多個顧客,也就是一個生產者對應多個消費者,消費者群組(Consumer Group)指的就是由一個或多個消費者組成的群體。

偏移量:偏移量(Consumer Offset)是一種元資料,它是一個不斷遞增的整數值,用來記錄消費者發生重平衡時的位置,以便用來恢復資料。

broker: 一個獨立的 Kafka 伺服器就被稱為 broker,broker 接收來自生產者的訊息,為訊息設定偏移量,並提交訊息到磁碟儲存。

broker 叢集:broker 是叢集 的組成部分,broker 叢集由一個或多個 broker 組成,每個叢集都有一個 broker 同時充當了叢集控制器的角色(自動從叢集的活躍成員中選舉出來)。

副本:Kafka 中訊息的備份又叫做 副本(Replica),副本的數量是可以配置的,Kafka 定義了兩類副本:領導者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對外提供服務,後者只是被動跟隨。

重平衡:Rebalance。消費者組內某個消費者例項掛掉後,其他消費者例項自動重新分配訂閱主題分割槽的過程。Rebalance 是 Kafka 消費者端實現高可用的重要手段。

Kafka 的特性(設計原則)

  • 高吞吐、低延遲:kakfa 最大的特點就是收發訊息非常快,kafka 每秒可以處理幾十萬條訊息,它的最低延遲只有幾毫秒。
  • 高伸縮性: 每個主題(topic) 包含多個分割槽(partition),主題中的分割槽可以分佈在不同的主機(broker)中。
  • 永續性、可靠性: Kafka 能夠允許資料的持久化儲存,訊息被持久化到磁碟,並支援資料備份防止資料丟失,Kafka 底層的資料儲存是基於 Zookeeper 儲存的,Zookeeper 我們知道它的資料能夠持久儲存。
  • 容錯性: 允許叢集中的節點失敗,某個節點宕機,Kafka 叢集能夠正常工作
  • 高併發: 支援數千個客戶端同時讀寫

Kafka 的使用場景

  • 活動跟蹤:Kafka 可以用來跟蹤使用者行為,比如我們經常回去淘寶購物,你開啟淘寶的那一刻,你的登陸資訊,登陸次數都會作為訊息傳輸到 Kafka ,當你瀏覽購物的時候,你的瀏覽資訊,你的搜尋指數,你的購物愛好都會作為一個個訊息傳遞給 Kafka ,這樣就可以生成報告,可以做智慧推薦,購買喜好等。
  • 傳遞訊息:Kafka 另外一個基本用途是傳遞訊息,應用程式向用戶傳送通知就是通過傳遞訊息來實現的,這些應用元件可以生成訊息,而不需要關心訊息的格式,也不需要關心訊息是如何傳送的。
  • 度量指標:Kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告。
  • 日誌記錄:Kafka 的基本概念來源於提交日誌,比如我們可以把資料庫的更新發送到 Kafka 上,用來記錄資料庫的更新時間,通過kafka以統一介面服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • 流式處理:流式處理是有一個能夠提供多種應用程式的領域。
  • 限流削峰:Kafka 多用於網際網路領域某一時刻請求特別多的情況下,可以把請求寫入Kafka 中,避免直接請求後端程式導致服務崩潰。

Kafka 的訊息佇列

Kafka 的訊息佇列一般分為兩種模式:點對點模式和釋出訂閱模式

Kafka 是支援消費者群組的,也就是說 Kafka 中會有一個或者多個消費者,如果一個生產者生產的訊息由一個消費者進行消費的話,那麼這種模式就是點對點模式

如果一個生產者或者多個生產者產生的訊息能夠被多個消費者同時消費的情況,這樣的訊息佇列成為釋出訂閱模式的訊息佇列

Kafka 系統架構

如上圖所示,一個典型的 Kafka 叢集中包含若干Producer(可以是web前端產生的Page View,或者是伺服器日誌,系統CPU、Memory等),若干broker(Kafka支援水平擴充套件,一般broker數量越多,叢集吞吐率越高),若干Consumer Group,以及一個Zookeeper叢集。Kafka通過Zookeeper管理叢集配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將訊息釋出到broker,Consumer使用pull模式從broker訂閱並消費訊息。

核心 API

Kafka 有四個核心API,它們分別是

  • Producer API,它允許應用程式向一個或多個 topics 上傳送訊息記錄
  • Consumer API,允許應用程式訂閱一個或多個 topics 並處理為其生成的記錄流
  • Streams API,它允許應用程式作為流處理器,從一個或多個主題中消費輸入流併為其生成輸出流,有效的將輸入流轉換為輸出流。
  • Connector API,它允許構建和執行將 Kafka 主題連線到現有應用程式或資料系統的可用生產者和消費者。例如,關係資料庫的聯結器可能會捕獲對錶的所有更改

Kafka 為何如此之快

Kafka 實現了零拷貝原理來快速移動資料,避免了核心之間的切換。Kafka 可以將資料記錄分批發送,從生產者到檔案系統(Kafka 主題日誌)到消費者,可以端到端的檢視這些批次的資料。

批處理能夠進行更有效的資料壓縮並減少 I/O 延遲,Kafka 採取順序寫入磁碟的方式,避免了隨機磁碟定址的浪費,更多關於磁碟定址的瞭解,請參閱 程式設計師需要了解的硬核知識之磁碟 。

總結一下其實就是四個要點

  • 順序讀寫
  • 零拷貝
  • 訊息壓縮
  • 分批發送

Kafka 安裝和重要配置

Kafka 安裝我在 Kafka 系列第一篇應該比較詳細了,詳情見帶你漲姿勢的認識一下kafka 這篇文章。

那我們還是主要來說一下 Kafka 中的重要引數配置吧,這些引數對 Kafka 來說是非常重要的。

broker 端配置

  • broker.id

每個 kafka broker 都有一個唯一的標識來表示,這個唯一的識別符號即是 broker.id,它的預設值是 0。這個值在 kafka 叢集中必須是唯一的,這個值可以任意設定,

  • port

如果使用配置樣本來啟動 kafka,它會監聽 9092 埠。修改 port 配置引數可以把它設定成任意的埠。要注意,如果使用 1024 以下的埠,需要使用 root 許可權啟動 kakfa。

  • zookeeper.connect

用於儲存 broker 元資料的 Zookeeper 地址是通過 zookeeper.connect 來指定的。比如我可以這麼指定 localhost:2181 表示這個 Zookeeper 是執行在本地 2181 埠上的。我們也可以通過 比如我們可以通過 zk1:2181,zk2:2181,zk3:2181 來指定 zookeeper.connect 的多個引數值。該配置引數是用冒號分割的一組 hostname:port/path 列表,其含義如下

hostname 是 Zookeeper 伺服器的機器名或者 ip 地址。

port 是 Zookeeper 客戶端的埠號

/path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了 chroot 環境,如果不指定預設使用跟路徑。

如果你有兩套 Kafka 叢集,假設分別叫它們 kafka1 和 kafka2,那麼兩套叢集的zookeeper.connect引數可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2

  • log.dirs

Kafka 把所有的訊息都儲存到磁碟上,存放這些日誌片段的目錄是通過 log.dirs 來制定的,它是用一組逗號來分割的本地系統路徑,log.dirs 是沒有預設值的,你必須手動指定他的預設值。其實還有一個引數是 log.dir,如你所知,這個配置是沒有 s 的,預設情況下只用配置 log.dirs 就好了,比如你可以通過 /home/kafka1,/home/kafka2,/home/kafka3 這樣來配置這個引數的值。

  • num.recovery.threads.per.data.dir

對於如下3種情況,Kafka 會使用可配置的執行緒池來處理日誌片段。

伺服器正常啟動,用於開啟每個分割槽的日誌片段;

伺服器崩潰後重啟,用於檢查和截斷每個分割槽的日誌片段;

伺服器正常關閉,用於關閉日誌片段。

預設情況下,每個日誌目錄只使用一個執行緒。因為這些執行緒只是在伺服器啟動和關閉時會用到,所以完全可以設定大量的執行緒來達到井行操作的目的。特別是對於包含大量分割槽的伺服器來說,一旦發生崩憤,在進行恢復時使用井行操作可能會省下數小時的時間。設定此引數時需要注意,所配置的數字對應的是 log.dirs 指定的單個日誌目錄。也就是說,如果 num.recovery.threads.per.data.dir 被設為 8,並且 log.dir 指定了 3 個路徑,那麼總共需要 24 個執行緒。

  • auto.create.topics.enable

預設情況下,kafka 會使用三種方式來自動建立主題,下面是三種情況:

當一個生產者開始往主題寫入訊息時

當一個消費者開始從主題讀取訊息時

當任意一個客戶端向主題傳送元資料請求時

auto.create.topics.enable引數我建議最好設定成 false,即不允許自動建立 Topic。在我們的線上環境裡面有很多名字稀奇古怪的 Topic,我想大概都是因為該引數被設定成了 true 的緣故。

主題預設配置

Kafka 為新建立的主題提供了很多預設配置引數,下面就來一起認識一下這些引數

  • num.partitions

num.partitions 引數指定了新建立的主題需要包含多少個分割槽。如果啟用了主題自動建立功能(該功能是預設啟用的),主題分割槽的個數就是該引數指定的值。該引數的預設值是 1。要注意,我們可以增加主題分割槽的個數,但不能減少分割槽的個數。

  • default.replication.factor

這個引數比較簡單,它表示 kafka儲存訊息的副本數,如果一個副本失效了,另一個還可以繼續提供服務default.replication.factor 的預設值為1,這個引數在你啟用了主題自動建立功能後有效。

  • log.retention.ms

Kafka 通常根據時間來決定資料可以保留多久。預設使用 log.retention.hours 引數來配置時間,預設是 168 個小時,也就是一週。除此之外,還有兩個引數 log.retention.minutes 和 log.retentiion.ms 。這三個引數作用是一樣的,都是決定訊息多久以後被刪除,推薦使用 log.retention.ms。

  • log.retention.bytes

另一種保留訊息的方式是判斷訊息是否過期。它的值通過引數 log.retention.bytes 來指定,作用在每一個分割槽上。也就是說,如果有一個包含 8 個分割槽的主題,並且 log.retention.bytes 被設定為 1GB,那麼這個主題最多可以保留 8GB 資料。所以,當主題的分割槽個數增加時,整個主題可以保留的資料也隨之增加。

  • log.segment.bytes

上述的日誌都是作用在日誌片段上,而不是作用在單個訊息上。當訊息到達 broker 時,它們被追加到分割槽的當前日誌片段上,當日志片段大小到達 log.segment.bytes 指定上限(預設為 1GB)時,當前日誌片段就會被關閉,一個新的日誌片段被開啟。如果一個日誌片段被關閉,就開始等待過期。這個引數的值越小,就越會頻繁的關閉和分配新檔案,從而降低磁碟寫入的整體效率。

  • log.segment.ms

上面提到日誌片段經關閉後需等待過期,那麼 log.segment.ms 這個引數就是指定日誌多長時間被關閉的引數和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日誌片段會在大小或時間到達上限時被關閉,就看哪個條件先得到滿足。

  • message.max.bytes

broker 通過設定 message.max.bytes 引數來限制單個訊息的大小,預設是 1000 000, 也就是 1MB,如果生產者嘗試傳送的訊息超過這個大小,不僅訊息不會被接收,還會收到 broker 返回的錯誤訊息。跟其他與位元組相關的配置引數一樣,該引數指的是壓縮後的訊息大小,也就是說,只要壓縮後的訊息小於 mesage.max.bytes,那麼訊息的實際大小可以大於這個值

這個值對效能有顯著的影響。值越大,那麼負責處理網路連線和請求的執行緒就需要花越多的時間來處理這些請求。它還會增加磁碟寫入塊的大小,從而影響 IO 吞吐量。

  • retention.ms

規定了該主題訊息被儲存的時常,預設是7天,即該主題只能儲存7天的訊息,一旦設定了這個值,它會覆蓋掉 Broker 端的全域性引數值。

  • retention.bytes

retention.bytes:規定了要為該 Topic 預留多大的磁碟空間。和全域性引數作用相似,這個值通常在多租戶的 Kafka 叢集中會有用武之地。當前預設值是 -1,表示可以無限使用磁碟空間。

JVM 引數配置

JDK 版本一般推薦直接使用 JDK1.8,這個版本也是現在中國大部分程式設計師的首選版本。

說到 JVM 端設定,就繞不開這個話題,業界最推崇的一種設定方式就是直接將 JVM 堆大小設定為 6GB,這樣會避免很多 Bug 出現。

JVM 端配置的另一個重要引數就是垃圾回收器的設定,也就是平時常說的 GC 設定。如果你依然在使用 Java 7,那麼可以根據以下法則選擇合適的垃圾回收器:

  • 如果 Broker 所在機器的 CPU 資源非常充裕,建議使用 CMS 收集器。啟用方法是指定-XX:+UseCurrentMarkSweepGC
  • 否則,使用吞吐量收集器。開啟方法是指定-XX:+UseParallelGC

當然了,如果你已經在使用 Java 8 了,那麼就用預設的 G1 收集器就好了。在沒有任何調優的情況下,G1 表現得要比 CMS 出色,主要體現在更少的 Full GC,需要調整的引數更少等,所以使用 G1 就好了。

一般 G1 的調整隻需要這兩個引數即可

  • MaxGCPauseMillis

該引數指定每次垃圾回收預設的停頓時間。該值不是固定的,G1可以根據需要使用更長的時間。它的預設值是 200ms,也就是說,每一輪垃圾回收大概需要200 ms 的時間。

  • InitiatingHeapOccupancyPercent

該引數指定了 G1 啟動新一輪垃圾回收之前可以使用的堆記憶體百分比,預設值是45,這就表明G1在堆使用率到達45之前不會啟用垃圾回收。這個百分比包括新生代和老年代。

Kafka Producer

在 Kafka 中,我們把產生訊息的那一方稱為生產者,比如我們經常回去淘寶購物,你開啟淘寶的那一刻,你的登陸資訊,登陸次數都會作為訊息傳輸到 Kafka 後臺,當你瀏覽購物的時候,你的瀏覽資訊,你的搜尋指數,你的購物愛好都會作為一個個訊息傳遞給 Kafka 後臺,然後淘寶會根據你的愛好做智慧推薦,致使你的錢包從來都禁不住誘惑,那麼這些生產者產生的訊息是怎麼傳到 Kafka 應用程式的呢?傳送過程是怎麼樣的呢?

儘管訊息的產生非常簡單,但是訊息的傳送過程還是比較複雜的,如圖

我們從建立一個ProducerRecord 物件開始,ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要傳送的 key/value 鍵值對,它由記錄要傳送到的主題名稱(Topic Name),可選的分割槽號(Partition Number)以及可選的鍵值對構成。

在傳送 ProducerRecord 時,我們需要將鍵值對物件由序列化器轉換為位元組陣列,這樣它們才能夠在網路上傳輸。然後訊息到達了分割槽器。

如果傳送過程中指定了有效的分割槽號,那麼在傳送記錄時將使用該分割槽。如果傳送過程中未指定分割槽,則將使用key 的 hash 函式對映指定一個分割槽。如果傳送的過程中既沒有分割槽號也沒有,則將以迴圈的方式分配一個分割槽。選好分割槽後,生產者就知道向哪個主題和分割槽傳送資料了。

ProducerRecord 還有關聯的時間戳,如果使用者沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間作為時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳型別。

  • 如果將主題配置為使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
  • 如果將主題配置為使用LogAppendTime,則生產者記錄中的時間戳在將訊息新增到其日誌中時,將由 broker 重寫。

然後,這條訊息被存放在一個記錄批次裡,這個批次裡的所有訊息會被髮送到相同的主題和分割槽上。由一個獨立的執行緒負責把它們發到 Kafka Broker 上。

Kafka Broker 在收到訊息時會返回一個響應,如果寫入成功,會返回一個 RecordMetaData 物件,它包含了主題和分割槽資訊,以及記錄在分割槽裡的偏移量,上面兩種的時間戳型別也會返回給使用者。如果寫入失敗,會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送訊息,幾次之後如果還是失敗的話,就返回錯誤訊息。

建立 Kafka 生產者

要向 Kafka 寫入訊息,首先需要建立一個生產者物件,並設定一些屬性。Kafka 生產者有3個必選的屬性

  • bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式為 host:port。清單裡不需要包含所有的 broker 地址,生產者會從給定的 broker 裡查詢到其他的 broker 資訊。不過建議至少要提供兩個 broker 資訊,一旦其中一個宕機,生產者仍然能夠連線到叢集上。

  • key.serializer

broker 需要接收到序列化之後的 key/value值,所以生產者傳送的訊息需要經過序列化之後才傳遞給 Kafka Broker。生產者需要知道採用何種方式把 Java 物件轉換為位元組陣列。key.serializer 必須被設定為一個實現了org.apache.kafka.common.serialization.Serializer 介面的類,生產者會使用這個類把鍵物件序列化為位元組陣列。這裡拓展一下 Serializer 類

Serializer 是一個介面,它表示類將會採用何種方式序列化,它的作用是把物件轉換為位元組,實現了 Serializer 介面的類主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 預設使用的序列化器,其他的序列化器還有很多,你可以通過 這裡 檢視其他序列化器。要注意的一點:key.serializer 是必須要設定的,即使你打算只發送值的內容。

  • value.serializer

與 key.serializer 一樣,value.serializer 指定的類會將值序列化。

下面程式碼演示瞭如何建立一個 Kafka 生產者,這裡只指定了必要的屬性,其他使用預設的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

來解釋一下這段程式碼

  • 首先建立了一個 Properties 物件
  • 使用 StringSerializer 序列化器序列化 key / value 鍵值對
  • 在這裡我們建立了一個新的生產者物件,併為鍵值設定了恰當的型別,然後把 Properties 物件傳遞給他。

Kafka 訊息傳送

例項化生產者物件後,接下來就可以開始傳送訊息了,傳送訊息主要由下面幾種方式

簡單訊息傳送

Kafka 最簡單的訊息傳送如下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

程式碼中生產者(producer)的 send() 方法需要把 ProducerRecord 的物件作為引數進行傳送,ProducerRecord 有很多建構函式,這個我們下面討論,這裡呼叫的是

public ProducerRecord(String topic, K key, V value) {}

這個建構函式,需要傳遞的是 topic主題,key 和 value。

把對應的引數傳遞完成後,生產者呼叫 send() 方法傳送訊息(ProducerRecord物件)。我們可以從生產者的架構圖中看出,訊息是先被寫入分割槽中的緩衝區中,然後分批次傳送給 Kafka Broker。

傳送成功後,send() 方法會返回一個 Future(java.util.concurrent) 物件,Future 物件的型別是 RecordMetadata 型別,我們上面這段程式碼沒有考慮返回值,所以沒有生成對應的 Future 物件,所以沒有辦法知道訊息是否傳送成功。如果不是很重要的資訊或者對結果不會產生影響的資訊,可以使用這種方式進行傳送。

我們可以忽略傳送訊息時可能發生的錯誤或者在伺服器端可能發生的錯誤,但在訊息傳送之前,生產者還可能發生其他的異常。這些異常有可能是 SerializationException(序列化失敗)BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送執行緒被中斷)

同步傳送訊息

第二種訊息傳送機制如下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

這種傳送訊息的方式較上面的傳送方式有了改進,首先呼叫 send() 方法,然後再呼叫 get() 方法等待 Kafka 響應。如果伺服器返回錯誤,get() 方法會丟擲異常,如果沒有發生錯誤,我們會得到 RecordMetadata 物件,可以用它來檢視訊息記錄。

生產者(KafkaProducer)在傳送的過程中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤可以通過重發訊息來解決。比如連線的錯誤,可以通過再次建立連線來解決;無錯誤則可以通過重新為分割槽選舉首領來解決。KafkaProducer 被配置為自動重試,如果多次重試後仍無法解決問題,則會丟擲重試異常。另一類錯誤是無法通過重試來解決的,比如訊息過大對於這類錯誤,KafkaProducer 不會進行重試,直接丟擲異常。

非同步傳送訊息

同步傳送訊息都有個問題,那就是同一時間只能有一個訊息在傳送,這會造成許多訊息無法直接傳送,造成訊息滯後,無法發揮效益最大化。

比如訊息在應用程式和 Kafka 叢集之間一個來回需要 10ms。如果傳送完每個訊息後都等待響應的話,那麼傳送100個訊息需要 1 秒,但是如果是非同步方式的話,傳送 100 條訊息所需要的時間就會少很多很多。大多數時候,雖然Kafka 會返回 RecordMetadata 訊息,但是我們並不需要等待響應。

為了在非同步傳送訊息的同時能夠對異常情況進行處理,生產者提供了回掉支援。下面是回撥的一個例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先實現回撥需要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個介面只有一個 onCompletion方法。如果 kafka 返回一個錯誤,onCompletion 方法會丟擲一個非空(non null)異常,這裡我們只是簡單的把它打印出來,如果是生產環境需要更詳細的處理,然後在 send() 方法傳送的時候傳遞一個 Callback 回撥的物件。

生產者分割槽機制

Kafka 對於資料的讀寫是以分割槽為粒度的,分割槽可以分佈在多個主機(Broker)中,這樣每個節點能夠實現獨立的資料寫入和讀取,並且能夠通過增加新的節點來增加 Kafka 叢集的吞吐量,通過分割槽部署在多個 Broker 來實現負載均衡的效果。

上面我們介紹了生產者的傳送方式有三種:不管結果如何直接傳送傳送並返回結果傳送並回調。由於訊息是存在主題(topic)的分割槽(partition)中的,所以當 Producer 生產者傳送產生一條訊息發給 topic 的時候,你如何判斷這條訊息會存在哪個分割槽中呢?

這其實就設計到 Kafka 的分割槽機制了。

分割槽策略

Kafka 的分割槽策略指的就是將生產者傳送到哪個分割槽的演算法。Kafka 為我們提供了預設的分割槽策略,同時它也支援你自定義分割槽策略。

如果要自定義分割槽策略的話,你需要顯示配置生產者端的引數 Partitioner.class,我們可以看一下這個類它位於 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 類有三個方法,分別來解釋一下

  • partition(): 這個類有幾個引數: topic,表示需要傳遞的主題;key 表示訊息中的鍵值;keyBytes表示分割槽中序列化過後的key,byte陣列的形式傳遞;value 表示訊息的 value 值;valueBytes 表示分割槽中序列化後的值陣列;cluster表示當前叢集的原資料。Kafka 給你這麼多資訊,就是希望讓你能夠充分地利用這些資訊對訊息進行分割槽,計算出它要被髮送到哪個分割槽中。
  • close() : 繼承了 Closeable 介面能夠實現 close() 方法,在分割槽關閉時呼叫。
  • onNewBatch(): 表示通知分割槽程式用來建立新的批次

其中與分割槽策略息息相關的就是 partition() 方法了,分割槽策略有下面這幾種

順序輪詢

順序分配,訊息是均勻的分配給每個 partition,即每個分割槽儲存一次訊息。就像下面這樣

上圖表示的就是輪詢策略,輪訓策略是 Kafka Producer 提供的預設策略,如果你不使用指定的輪訓策略的話,Kafka 預設會使用順序輪訓策略的方式。

隨機輪詢

隨機輪詢簡而言之就是隨機的向 partition 中儲存訊息,如下圖所示

實現隨機分配的程式碼只需要兩行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分割槽數,然後隨機地返回一個小於它的正整數。

本質上看隨機策略也是力求將資料均勻地打散到各個分割槽,但從實際表現來看,它要遜於輪詢策略,所以如果追求資料的均勻分佈,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分割槽策略,在新版本中已經改為輪詢了。

按照 key 進行訊息儲存

這個策略也叫做 key-ordering 策略,Kafka 中每條訊息都會有自己的key,一旦訊息被定義了 Key,那麼你就可以保證同一個 Key 的所有訊息都進入到相同的分割槽裡面,由於每個分割槽下的訊息處理都是有順序的,故這個策略被稱為按訊息鍵保序策略,如下圖所示

實現這個策略的 partition 方法同樣簡單,只需要下面兩行程式碼即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面這幾種分割槽策略都是比較基礎的策略,除此之外,你還可以自定義分割槽策略。

生產者壓縮機制

壓縮一詞簡單來講就是一種互換思想,它是一種經典的用 CPU 時間去換磁碟空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁碟佔用或更少的網路 I/O 傳輸。如果你還不瞭解的話我希望你先讀完這篇文章 程式設計師需要了解的硬核知識之壓縮演算法,然後你就明白壓縮是怎麼回事了。

Kafka 壓縮是什麼

Kafka 的訊息分為兩層:訊息集合 和 訊息。一個訊息集合中包含若干條日誌項,而日誌項才是真正封裝訊息的地方。Kafka 底層的訊息日誌由一系列訊息集合日誌項組成。Kafka 通常不會直接操作具體的一條條訊息,它總是在訊息集合這個層面上進行寫入操作。

在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,為什麼啟用壓縮?說白了就是訊息太大,需要變小一點 來使訊息發的更快一些。

Kafka Producer 中使用 compression.type 來開啟壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面程式碼表明該 Producer 的壓縮演算法使用的是 GZIP

有壓縮必有解壓縮,Producer 使用壓縮演算法壓縮訊息後併發送給伺服器後,由 Consumer 消費者進行解壓縮,因為採用的何種壓縮演算法是隨著 key、value 一起傳送過去的,所以消費者知道採用何種壓縮演算法。

Kafka 重要引數配置

在上一篇文章 帶你漲姿勢的認識一下kafka中,我們主要介紹了一下 kafka 叢集搭建的引數,本篇文章我們來介紹一下 Kafka 生產者重要的配置,生產者有很多可配置的引數,在文件裡(http://kafka.apache.org/documentation/#producerconfigs)都有說明,我們介紹幾個在記憶體使用、效能和可靠性方面對生產者影響比較大的引數進行說明

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 介面

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 介面

acks

acks 引數指定了要有多少個分割槽副本接收訊息,生產者才認為訊息是寫入成功的。此引數對訊息丟失的影響較大

  • 如果 acks = 0,就表示生產者也不知道自己產生的訊息是否被伺服器接收了,它才知道它寫成功了。如果傳送的途中產生了錯誤,生產者也不知道,它也比較懵逼,因為沒有返回任何訊息。這就類似於 UDP 的運輸層協議,只管發,伺服器接受不接受它也不關心。
  • 如果 acks = 1,只要叢集的 Leader 接收到訊息,就會給生產者返回一條訊息,告訴它寫入成功。如果傳送途中造成了網路異常或者 Leader 還沒選舉出來等其他情況導致訊息寫入失敗,生產者會受到錯誤訊息,這時候生產者往往會再次重發資料。因為訊息的傳送也分為 同步非同步,Kafka 為了保證訊息的高效傳輸會決定是同步傳送還是非同步傳送。如果讓客戶端等待伺服器的響應(通過呼叫 Future 中的 get() 方法),顯然會增加延遲,如果客戶端使用回撥,就會解決這個問題。
  • 如果 acks = all,這種情況下是隻有當所有參與複製的節點都收到訊息時,生產者才會接收到一個來自伺服器的訊息。不過,它的延遲比 acks =1 時更高,因為我們要等待不只一個伺服器節點接收訊息。

buffer.memory

此引數用來設定生產者記憶體緩衝區的大小,生產者用它緩衝要傳送到伺服器的訊息。如果應用程式傳送訊息的速度超過傳送到伺服器的速度,會導致生產者空間不足。這個時候,send() 方法呼叫要麼被阻塞,要麼丟擲異常,具體取決於 block.on.buffer.null 引數的設定。

compression.type

此引數來表示生產者啟用何種壓縮演算法,預設情況下,訊息傳送時不會被壓縮。該引數可以設定為 snappy、gzip 和 lz4,它指定了訊息傳送給 broker 之前使用哪一種壓縮演算法進行壓縮。下面是各壓縮演算法的對比

retries

生產者從伺服器收到的錯誤有可能是臨時性的錯誤(比如分割槽找不到首領),在這種情況下,reteis 引數的值決定了生產者可以重發的訊息次數,如果達到這個次數,生產者會放棄重試並返回錯誤。預設情況下,生產者在每次重試之間等待 100ms,這個等待引數可以通過 retry.backoff.ms 進行修改。

batch.size

當有多個訊息需要被髮送到同一個分割槽時,生產者會把它們放在同一個批次裡。該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算。當批次被填滿,批次裡的所有訊息會被髮送出去。不過生產者井不一定都會等到批次被填滿才傳送,任意條數的訊息都可能被髮送。

client.id

此引數可以是任意的字串,伺服器會用它來識別訊息的來源,一般配置在日誌裡

max.in.flight.requests.per.connection

此引數指定了生產者在收到伺服器響應之前可以傳送多少訊息,它的值越高,就會佔用越多的記憶體,不過也會提高吞吐量。把它設為1 可以保證訊息是按照發送的順序寫入伺服器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在傳送資料時等待伺服器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元資料(比如目標分割槽的首領是誰)時等待伺服器返回響應的時間。如果等待時間超時,生產者要麼重試傳送資料,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回訊息確認的時間,與 asks 的配置相匹配----如果在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

max.block.ms

此引數指定了在呼叫 send() 方法或使用 partitionFor() 方法獲取元資料時生產者的阻塞時間當生產者的傳送緩衝區已捕,或者沒有可用的元資料時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會丟擲超時異常。

max.request.size

該引數用於控制生產者傳送的請求大小。它可以指能傳送的單個訊息的最大值,也可以指單個請求裡所有訊息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,為了保證可靠的訊息傳輸,這兩個引數分別指定了 TCP Socket 接收和傳送資料包的緩衝區的大小。如果它們被設定為 -1,就使用作業系統的預設值。如果生產者或消費者與 broker 處於不同的資料中心,那麼可以適當增大這些值。

Kafka Consumer

應用程式使用 KafkaConsumer 從 Kafka 中訂閱主題並接收來自這些主題的訊息,然後再把他們儲存起來。應用程式首先需要建立一個 KafkaConsumer 物件,訂閱主題並開始接受訊息,驗證訊息並儲存結果。一段時間後,生產者往主題寫入的速度超過了應用程式驗證資料的速度,這時候該如何處理?如果只使用單個消費者的話,應用程式會跟不上訊息生成的速度,就像多個生產者像相同的主題寫入訊息一樣,這時候就需要多個消費者共同參與消費主題中的訊息,對訊息進行分流處理。

Kafka 消費者從屬於消費者群組。一個群組中的消費者訂閱的都是相同的主題,每個消費者接收主題一部分分割槽的訊息。下面是一個 Kafka 分割槽消費示意圖

上圖中的主題 T1 有四個分割槽,分別是分割槽0、分割槽1、分割槽2、分割槽3,我們建立一個消費者群組1,消費者群組中只有一個消費者,它訂閱主題T1,接收到 T1 中的全部訊息。由於一個消費者處理四個生產者傳送到分割槽的訊息,壓力有些大,需要幫手來幫忙分擔任務,於是就演變為下圖

這樣一來,消費者的消費能力就大大提高了,但是在某些環境下比如使用者產生訊息特別多的時候,生產者產生的訊息仍舊讓消費者吃不消,那就繼續增加消費者。

如上圖所示,每個分割槽所產生的訊息能夠被每個消費者群組中的消費者消費,如果向消費者群組中增加更多的消費者,那麼多餘的消費者將會閒置,如下圖所示

向群組中增加消費者是橫向伸縮消費能力的主要方式。總而言之,我們可以通過增加消費組的消費者來進行水平擴充套件提升消費能力。這也是為什麼建議建立主題時使用比較多的分割槽數,這樣可以在消費負載高的情況下增加消費者來提升效能。另外,消費者的數量不應該比分割槽數多,因為多出來的消費者是空閒的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次訊息,可以支援任意多的應用讀取這個訊息。換句話說,每個應用都可以讀到全量的訊息。為了使得每個應用都能讀到全量訊息,應用需要有不同的消費組。對於上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼就演變為下圖這樣

在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量訊息,在邏輯意義上來說它們屬於不同的應用。

總結起來就是如果應用需要讀取全量訊息,那麼請為該應用設定一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裡增加消費者。

消費者組和分割槽重平衡

消費者組是什麼

消費者組(Consumer Group)是由一個或多個消費者例項(Consumer Instance)組成的群組,具有可擴充套件性和可容錯性的一種機制。消費者組內的消費者共享一個消費者組ID,這個ID 也叫做 Group ID,組內的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分割槽的訊息,多餘的消費者會閒置,派不上用場。

我們在上面提到了兩種消費方式

  • 一個消費者群組消費一個主題中的訊息,這種消費模式又稱為點對點的消費方式,點對點的消費方式又被稱為訊息佇列
  • 一個主題中的訊息被多個消費者群組共同消費,這種消費模式又稱為釋出-訂閱模式

消費者重平衡

我們從上面的消費者演變圖中可以知道這麼一個過程:最初是一個消費者訂閱一個主題並消費其全部分割槽的訊息,後來有一個消費者加入群組,隨後又有更多的消費者加入群組,而新加入的消費者例項分攤了最初消費者的部分訊息,這種把分割槽的所有權通過一個消費者轉到其他消費者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示

重平衡非常重要,它為消費者群組帶來了高可用性伸縮性,我們可以放心的新增消費者或移除消費者,不過在正常情況下我們並不希望發生這樣的行為。在重平衡期間,消費者無法讀取訊息,造成整個消費者組在重平衡的期間都不可用。另外,當分割槽被重新分配給另一個消費者時,訊息當前的讀取狀態會丟失,它有可能還需要去重新整理快取,在它重新恢復狀態之前會拖慢應用程式。

消費者通過向組織協調者(Kafka Broker)傳送心跳來維護自己是消費者組的一員並確認其擁有的分割槽。對於不同不的消費群體來說,其組織協調者可以是不同的。只要消費者定期傳送心跳,就會認為消費者是存活的並處理其分割槽中的訊息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。

如果過了一段時間 Kafka 停止傳送心跳了,會話(Session)就會過期,組織協調者就會認為這個 Consumer 已經死亡,就會觸發一次重平衡。如果消費者宕機並且停止傳送訊息,組織協調者會等待幾秒鐘,確認它死亡了才會觸發重平衡。在這段時間裡,死亡的消費者將不處理任何訊息。在清理消費者時,消費者將通知協調者它要離開群組,組織協調者會觸發一次重平衡,儘量降低處理停頓。

重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現在社群還無法修改。

重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機器》中 p76 關於 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其他所有的工作執行緒。直到它收集結束。Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機器在後臺自動發起並完成的,在使用者不可見的情況下把使用者正常工作的執行緒全部停掉,這對很多應用來說都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者例項都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......

建立消費者

上面的理論說的有點多,下面就通過程式碼來講解一下消費者是如何消費的

在讀取訊息之前,需要先建立一個 KafkaConsumer 物件。建立 KafkaConsumer 物件與建立 KafkaProducer 物件十分相似 --- 把需要傳遞給消費者的屬性放在 properties 物件中,後面我們會著重討論 Kafka 的一些配置,這裡我們先簡單的建立一下,使用3個屬性就足矣,分別是 bootstrap.serverkey.deserializervalue.deserializer

這三個屬性我們已經用過很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢是認識一下Kafka Producer

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬於哪個消費者群組。建立不屬於任何一個群組的消費者也是可以的

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主題訂閱

建立好消費者之後,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作為引數,使用起來比較簡單

consumer.subscribe(Collections.singletonList("customerTopic"));

為了簡單我們只訂閱了一個主題 customerTopic,引數傳入的是一個正則表示式,正則表示式可以匹配多個主題,如果有人建立了新的主題,並且主題的名字與正則表示式相匹配,那麼會立即觸發一次重平衡,消費者就可以讀取新的主題。

要訂閱所有與 test 相關的主題,可以這樣做

consumer.subscribe("test.*");

輪詢

我們知道,Kafka 是支援訂閱/釋出模式的,生產者傳送資料給 Kafka Broker,那麼消費者是如何知道生產者傳送了資料呢?其實生產者產生的資料消費者是不知道的,KafkaConsumer 採用輪詢的方式定期去 Kafka Broker 中進行資料的檢索,如果有資料就用來消費,如果沒有就再繼續輪詢等待,下面是輪詢等待的具體實現

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}
  • 這是一個無限迴圈。消費者實際上是一個長期執行的應用程式,它通過輪詢的方式向 Kafka 請求資料。
  • 第三行程式碼非常重要,Kafka 必須定期迴圈請求資料,否則就會認為該 Consumer 已經掛了,會觸發重平衡,它的分割槽會移交給群組中的其它消費者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,如果該引數被設定為 0 ,poll() 方法會立刻返回,否則就會在指定的毫秒數內一直等待 broker 返回資料。
  • poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的資訊,記錄所在分割槽的資訊、記錄在分割槽中的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理每條記錄。
  • 在退出應用程式之前使用 close() 方法關閉消費者。網路連線和 socket 也會隨之關閉,並立即觸發一次重平衡,而不是等待群組協調器發現它不再發送心跳並認定它已經死亡。

執行緒安全性

在同一個群組中,我們無法讓一個執行緒執行多個消費者,也無法讓多個執行緒安全的共享一個消費者。按照規則,一個消費者使用一個執行緒,如果一個消費者群組中多個消費者都想要執行的話,那麼必須讓每個消費者在自己的執行緒中執行,可以使用 Java 中的 ExecutorService 啟動多個消費者進行進行處理。

消費者配置

到目前為止,我們學習瞭如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文件列出了所有與消費者相關的配置說明。大部分引數都有合理的預設值,一般不需要修改它們,下面我們就來介紹一下這些引數。

  • fetch.min.bytes

該屬性指定了消費者從伺服器獲取記錄的最小位元組數。broker 在收到消費者的資料請求時,如果可用的資料量小於 fetch.min.bytes 指定的大小,那麼它會等到有足夠的可用資料時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題使用頻率不是很高的時候就不用來回處理訊息。如果沒有很多可用資料,但消費者的 CPU 使用率很高,那麼就需要把該屬性的值設得比預設值大。如果消費者的數量比較多,把該屬性的值調大可以降低 broker 的工作負載。

  • fetch.max.wait.ms

我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的資料時才會把它返回給消費者。而 fetch.max.wait.ms 則用於指定 broker 的等待時間,預設是 500 毫秒。如果沒有足夠的資料流入 kafka 的話,消費者獲取的最小資料量要求就得不到滿足,最終導致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把引數值設定的小一些。如果 fetch.max.wait.ms 被設定為 100 毫秒的延遲,而 fetch.min.bytes 的值設定為 1MB,那麼 Kafka 在收到消費者請求後,要麼返回 1MB 的資料,要麼在 100 ms 後返回所有可用的資料。就看哪個條件首先被滿足。

  • max.partition.fetch.bytes

該屬性指定了伺服器從每個分割槽裡返回給消費者的最大位元組數。它的預設值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分割槽裡返回的記錄最多不超過 max.partition.fetch.bytes 指定的位元組。如果一個主題有20個分割槽和5個消費者,那麼每個消費者需要至少4 MB的可用記憶體來接收記錄。在為消費者分配記憶體時,可以給它們多分配一些,因為如果群組裡有消費者發生崩潰,剩下的消費者需要處理更多的分割槽。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大訊息的位元組數(通過 max.message.size 屬性配置大),否則消費者可能無法讀取這些訊息,導致消費者一直掛起重試。 在設定該屬性時,另外一個考量的因素是消費者處理資料的時間。消費者需要頻繁的呼叫 poll() 方法來避免會話過期和發生分割槽再平衡,如果單次呼叫poll() 返回的資料太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

  • session.timeout.ms

這個屬性指定了消費者在被認為死亡之前可以與伺服器斷開連線的時間,預設是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內傳送心跳給群組協調器,就會被認定為死亡,協調器就會觸發重平衡。把它的分割槽分配給消費者群組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向群組協調器傳送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不傳送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設定的比預設值小,可以更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能導致非預期的重平衡。把該屬性的值設定得大一些,可以減少意外的重平衡,不過檢測節點崩潰需要更長的時間。

  • auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分割槽或者偏移量無效的情況下的該如何處理。它的預設值是 latest,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取資料。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分割槽的記錄。

  • enable.auto.commit

我們稍後將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,預設值是 true,為了儘量避免出現重複資料和資料丟失,可以把它設定為 false,由自己控制何時提交偏移量。如果把它設定為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率

  • partition.assignment.strategy

我們知道,分割槽會分配給群組中的消費者。PartitionAssignor 會根據給定的消費者和主題,決定哪些分割槽應該被分配給哪個消費者,Kafka 有兩個預設的分配策略RangeRoundRobin

  • client.id

該屬性可以是任意字串,broker 用他來標識從客戶端傳送過來的訊息,通常被用在日誌、度量指標和配額中

  • max.poll.records

該屬性用於控制單次呼叫 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢中需要處理的資料量。

  • receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫資料時用到的 TCP 緩衝區也可以設定大小。如果它們被設定為 -1,就使用作業系統預設值。如果生產者或消費者與 broker 處於不同的資料中心內,可以適當增大這些值,因為跨資料中心的網路一般都有比較高的延遲和比較低的頻寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費者在每次呼叫poll() 方法進行定時輪詢的時候,會返回由生產者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被群組裡的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤訊息在分割槽中的位置(偏移量)

消費者會向一個叫做 _consumer_offset 的特殊主題中傳送訊息,這個主題會儲存每次所傳送訊息中的分割槽偏移量,這個主題的主要作用就是消費者觸發重平衡後記錄偏移使用的,消費者每次向這個主題傳送訊息,正常情況下不觸發重平衡,這個主題是不起作用的,當觸發重平衡後,消費者停止工作,每個消費者可能會分到對應的分割槽,這個主題就是讓消費者能夠繼續處理訊息所設定的。

如果提交的偏移量小於客戶端最後一次處理的偏移量,那麼位於兩個偏移量之間的訊息就會被重複處理

如果提交的偏移量大於最後一次消費時的偏移量,那麼處於兩個偏移量中間的訊息將會丟失

既然_consumer_offset 如此重要,那麼它的提交方式是怎樣的呢?下面我們就來說一下####提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設定為true,那麼每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,預設是 5s。與消費者裡的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那麼就會提交從上一次輪詢中返回的偏移量。

提交當前偏移量

auto.commit.offset 設定為 false,可以讓應用程式決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功後馬上返回,如果提交失敗就丟擲異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄後要確保呼叫了 commitSync(),否則還是會有丟失訊息的風險,如果發生了在均衡,從最近一批訊息到發生在均衡之間的所有訊息都將被重複處理。

非同步提交

非同步提交 commitAsync() 與同步提交 commitSync() 最大的區別在於非同步提交不會進行重試,同步提交會一致進行重試。

同步和非同步組合提交

一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。但是如果在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。

因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量。

提交特定的偏移量

消費者API允許呼叫 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

下面為自己做個宣傳,歡迎關注公眾號 Java建設者,號主是Java技術棧,熱愛技術,喜歡閱讀,熱衷於分享和總結,希望能把每一篇好文章分享給成長道路上的你。
關注公眾號回覆 002 領取為你特意準備的大禮包,你一定會喜歡並收藏的。

文章參考:

Kafka史上最詳細原理總結

《Kafka 權威指南》

https://kafka.apache.org/

http://kafka.apache.org/documentation/

https://www.tutorialkart.com/apache-kafka-tutorial/

https://dzone.com/articles/what-is-kafka

《極客時間 - Kafka 核心技術與實戰》