1. 程式人生 > >【筆記】kafka權威指南-常用配置和要點記錄

【筆記】kafka權威指南-常用配置和要點記錄

Kafka 的應用場景

  1. 訊息佇列

    Kafka有更好的吞吐量,內建的分割槽,冗餘及容錯性,這讓Kafka成為了一個很好的大規模訊息處理應用的解決方案。

  2. 行為跟蹤和日誌收集。

    敏感操作和日誌,都可以寫到 kafka 裡進行統一,分情況的監控、和分析

  3. 流式處理,比如,kafka 和 storm 的對接,來實現流式處理

    還有其他的需要面向業務需求和場景來具體分析,主要就是利用 kafka 應對高併發的能力,還有實時處理的快速響應,資料實時落地磁碟的可靠性。

名詞定義

  • 主題和分割槽:kafka訊息通過主題進行分類。一個主題可以設定分為若干個分割槽,一個分割槽就是一個提交日誌。

    一個主題包含幾個分割槽,所以無法在整個主題範圍內保證訊息有序,但可以保證訊息在單個分割槽內的順序。

  • 一個獨立的 kafka 伺服器被稱為 broker

    broker 是叢集的組成部分,每個叢集都有一個 broker 充當管理角色。

  • 多叢集:隨著 kafka 叢集數量的增加最好使用多叢集。

    • 資料型別分離
    • 安全需求隔離
    • 多資料中心(容災)
  • MirrorMaker:kafka提供的一個用於在叢集之間進行訊息複製的工具。

常用配置

  • Spring @KafkaListener max.poll.records 預設 int 最大值 ,所以最好設定一下,否則積壓資料的情況下,一次拉取過多,處理時間長會導致超時,連線斷開、offset提交失敗、重複消費。
  • broker.id 整數 broker 間唯一
  • zookeeper.connect zookeeper 配置,格式: hostname:prot/path;…

    最好指定 path 這是在 zookeeper 儲存資料節點的根目錄

  • log.dirs 訊息儲存位置, ‘,’ 分割多個本地FS路徑

    broker 在增加分割槽的時候會優先選擇分割槽數少的路徑,而不考慮佔用的磁碟大小

  • num.recovery.threads.per.data.dir 處理日誌片段的執行緒數

# 三種情況會用到
1. 啟動時檢查每個日誌片段
2. 關閉時關閉日誌片段
3. 崩潰後重啟,檢查和截短日誌片段
# 因為只在啟動和關閉時用以並行操作,所以可以調大一點
# 如果設定為8個,並且log.dirs 配置了三個路徑,則會啟動 3*8 = 24個執行緒。
  • num.partitions 預設建立分割槽數量
  • log.retention.ms 資訊保留時間,預設 168 小時,以片段的最後修改時間計算。若存在 log.retention.minutes 則以最小的為準。
  • log.retention.bytes 每個分割槽的最大資料量 ,若設定為 1G 則三個分割槽的topic最多儲存3G的資料量。

    儲存時間和資料量以先到為準,即只要達到一個條件就刪除。

  • log.segment.bytes 每個日誌片段的大小

    如果太小則會頻繁關閉檔案,降低效率,一個片段的所有資料都達到超時時間才會刪除整個片段,所以太大 則管理的粒度太粗,同時也會降低使用時間戳獲取偏移量的精確度。

  • log.segment.ms 指定多長時間後關閉日誌片段

    與上面的 byte 引數依舊不衝突,先達到哪個,就先關閉。but 這個引數可能引起很多小的(達不到bytes 上限的) segment 在達到ms數時,同時關閉。會影響磁碟效能。

  • message.max.bytes 單個訊息大小,預設 1 000 000 就是大約1M。

硬體

  1. 磁碟吞吐量 在broker 和 partition 數量不變的前提下,生產者客戶端的效能直接受到服務端磁碟吞吐量的影響。
  2. 記憶體 磁碟效能影響生產者,而記憶體影響消費者。
  3. 網路和磁碟容量

生產者

#### 部分引數
1. bootstrap.servers 不需要指定所有的但至少兩個以上,以防其中一個宕機。客戶端會自己去borker上獲取服務端列表。
2. key/value.serializer 序列化工具
3. acks 指定多少個分割槽副本收到訊息才認為成功。(0,1,all) 0 則不等待任何回覆。
4. buffer.memory 緩衝要傳送到伺服器的訊息。若伺服器寫入速度不夠,緩衝區大於改配置則send方法會別阻塞,超過 max.block.ms 則會丟擲異常。
5. compression.type 壓縮方式,預設不壓縮

snappy,gzip,lz4 相對來說 snappy 佔用cpu較小,壓縮率可觀。gzip 會佔用更多cpu 同時也會提供更高的壓縮率。

  1. retries 收到臨時性錯誤的重試次數,重試等待時間預設100ms 除非配置 retry.backoff.ms

    對於找不到 leader 等錯誤會進行重試,比如訊息太大則無法通過重試解決,需要在邏輯中手動處理。

  2. batch.size 多條發向同一個分割槽的訊息會合併為一個批次,該引數指定批次大小。生產者並不會等他填滿才會傳送,所以不用擔心該引數過大會造成延遲。

  3. linger.ms 批次傳送的等待時間,與上一條件不衝突,到達一個限制就傳送,預設為 0 不延時。
  4. max.in.flight.requests.per.connection 指定了生產者在收到伺服器響應前可以再發送多少條訊息。過大會佔用記憶體,設為1 可以在發生重試的情況下也能保證順序寫入(當前生產者下)。但會嚴重影響生產者效能。
  5. timeout.ms 指定了 broker 等待同步副本返回訊息確認的時間,與ack機制匹配,指定時間內沒有收到副本確認,就會返回錯誤。
  6. reuqest.timeout.ms 生產者在傳送資料時等待伺服器返回響應的時間。
  7. metadata.fetch.timeout.ms 生產者在獲取元資料(比如目標分割槽首領是誰)時等待伺服器響應的時間。
  8. max.request.size 控制生產者傳送的請求大小(一個批次)。

    borker 對大小也有限制, message.max.bytes 單條訊息超過該大小會被broker拒絕。

  9. receive.buffer.bytes 和 send.buffer.bytes TCP socket 接受和傳送資料包的緩衝區大小 -1 則使用作業系統預設值。一般跨資料中心網路延遲較高可以適當增大。

傳送方式

  1. 只發送而不接收訊息 send(record)
  2. 同步傳送 send(record).get()
  3. 非同步傳送 send(record,new Callback(){…})

關於序列化器

不建議自己定義,建議使用通用序列化器,除了String 等還有 avro 、 JSON 。。

record 的組成

  1. Topic
  2. [Partition]
  3. [key]
  4. value

分割槽

  1. key 有兩個作用
    • 作為訊息的附加資訊
    • 決定訊息被寫道主題的那個分割槽
  2. 如果 key 為null 則預設使用輪詢演算法將訊息均衡分佈到各個分割槽上
  3. 如果key不為空並且使用了預設分割槽器 則 會 hash 分佈,(使用kafka自己的hash演算法,即使升級java版本,並不會影響hash值)
  4. 如果使用 key 來對映分割槽,則在建立主題時就設計好分割槽,不要再增加新分割槽,因為如果增加了,則會導致hash值改變,資料寫入不同的分割槽。
  5. 重寫 Partitioner 自己實現分割槽演算法。
  6. key 只能是字串

消費者

  • 每個 group 裡的多個消費者共同消費同一主題時,會各自消費一部分
  • 具體來說,每個同一個 group 內的多個消費者會分配有一個或多個分割槽,各自負責消費各自負責的分割槽內的訊息
  • 所以,如果組內消費者數量大於分割槽數量,則對於增加吞吐量是無益的,因為多出來的消費者是不會擁有分割槽,就會閒置,也就不會有消費的
  • 當消費者的消費頻次大於 session.timeout.ms 時就會認為該消費者下線,觸發事件為每個消費者重新分配分割槽,這個行為稱為-再均衡
  • 再均衡 分割槽所有權從一個消費者轉移到另一個消費者的行為,在再均衡期間,消費者無法讀取訊息,整個群組短時間內不可用。
  • 群組協調器 每個群組會有一個指定 broker 作為群組協調器(不同的群組可以不同),消費者通過向協調器傳送心跳來維持從屬關係和分割槽所有權(消費者會在輪詢訊息和提交偏移量的時候來發送心跳)。

常用方法

  • consumer.subscribe(**); 訂閱主題,可以是list或String或正則
  • poll(ms) 輪詢,引數 阻塞時間,返回 ConsumerRecords ,在第一次呼叫 poll 方法時,他會負責查詢 GroupCoordinator 然後加入群組,接受分配的分割槽。心跳也是在輪詢裡發出去的,所以要確保輪詢中處理的工作要儘快完成。

引數

  • fetch.min.bytes 消費者從伺服器獲取記錄的最小位元組數。broker在收到消費者資料請求時,如果可用的資料量小於 此處指定的大小,那麼就會等待有足夠資料後,才會返回給消費者。把該屬性值設定大一點可以降低broker的工作負載。
  • fetch.max.wait.ms 用於指定上一配置中資料量不夠需要等待的最大時長,預設500ms 如果沒有足夠的資料流入則會導致500ms的延遲。
  • max.partition.fetch.bytes 伺服器從每個分割槽裡返回給消費者的最大位元組數 預設1M ,就是在未消費資料充足的情況下,一次 kafkaConsumer.pull() 得到的資料 = 1M * 該消費者擁有的分割槽數。
  • session.timeout.ms 消費者可以與伺服器斷開連線的時間,超時沒有提交心跳資訊則認為死亡,出發再均衡,將他的分割槽分配給其他消費者,預設 3s
  • heartbeat.interval.ms 指定了poll方法向協調者傳送心跳的頻率,必須比 session.timeout.ms 小,每次poll方法時會檢查是否需要傳送心跳資訊
  • auto.offset.reset 指定了消費者在讀取一個沒有偏移量的分割槽或偏移量無效時該如何處理,預設 latest 從最新開始讀取,還可以設定 earliest 從分割槽起始位置開始
  • enable.auto.commit 預設是true ,設為false則需要手動提交 offset
  • auto.commit.interval.ms 控制自動提交 offset 頻率
  • partition.assignment.strategy 指定 PartitionAssignor 分配分割槽的策略
    • org.apache.kafka.clients.consumer.RangeAssignor 預設,連續分配,假設兩個消費者 三個分割槽,則 第一個[0,1] 第二個消費者 2
    • **.RoundRobinAssignor 輪詢策略 依然假設三個topic 2個消費者,消費者1[0,2] 消費者2[1]
    • 也可以指定自己類來自己實現策略
  • client.id 任意字串,在日誌、度量標誌和配額裡標記客戶端
  • max.poll.records 控制單次請求 能夠返回的記錄數量,可以控制輪詢裡需要處理的資料量
  • receive.buffer.bytes 和 send.buffer.bytes 讀寫資料時 TCP 緩衝區大小。-1 則使用系統預設大小。
    request.timeout.ms 必須必 session.timeout.ms 、 fetch.max.wait.ms 數值大

提交偏移量

  • consumer.commitSync() 同步提交
  • consumer.commitAsync() 非同步提交

    • 一般在輪詢中 非同步提交 而 try-catch-finally{ 同步提交 }
    • 無參提交是將整個批次的資料都提交,可以傳入一個 Map

再均衡監聽器 ConsumerRebalanceListener

可以實現介面的兩個方法傳入 消費者配置中,實現監聽,當觸發在均衡時,會會分別在在均衡開始前和重新分配後呼叫對應方法。

特定偏移量開始處理

  • seekToBeginning(Collection) 從起始位置處理
  • seekToEnd(Collection) 從分割槽結束為止開始處理
  • seek 特定位置處理

    consumer.seek(partition,offsetOfThisPartition);

退出

  • 在另一個執行緒中呼叫 consumer.wakeup() 會在下一次 poll() 的時候丟擲WakeupException異常,不需要處理,只是退出的一種方式
  • wakeup() 是唯一一個可以在其他執行緒中安全呼叫的方法
  • 在退出(無論是正常退出還是catch到異常finally時)之前最好 呼叫 consumer.close() 來通知協調器 再均衡,而不是等超時。

獨立消費者-沒有群組的消費者

當只需要消費幾個或全部分割槽的時候,並不需要群組和再均衡,則就可以使用簡單的沒有群組的消費者來直接消費,需要指定要消費的partition。

points

  • 日誌目錄配置在 log4j.properties