1. 程式人生 > >一、kafka 介紹 && kafka-client

一、kafka 介紹 && kafka-client

# 一、kafka 介紹 ## 1.1、kafka 介紹 Kafka 是一個分散式訊息引擎與流處理平臺,經常用做企業的訊息匯流排、實時資料管道,有的還把它當做儲存系統來使用。 早期 Kafka 的定位是一個高吞吐的分散式訊息系統,目前則演變成了一個成熟的分散式訊息引擎,以及流處理平臺。 Kafka 主要起到削峰填谷(緩衝)、系統解構以及冗餘的作用,主要特點有: - **高吞吐、低延時**:這是 Kafka 顯著的特點,Kafka 能夠達到百萬級的訊息吞吐量,延遲可達毫秒級; - **持久化儲存**:Kafka 的訊息最終持久化儲存在磁碟之上,提供了順序讀寫以保證效能,並且通過 Kafka 的副本機制提高了資料可靠性。 - **分散式可擴充套件**:Kafka 的資料是分散式儲存在不同 broker 節點的,以 topic 組織資料並且按 partition 進行分散式儲存,整體的擴充套件性都非常好。 - **高容錯性**:叢集中任意一個 broker 節點宕機,Kafka 仍能對外提供服務。 使用訊息佇列的好處: 解耦、冗餘(每個分割槽都有副本)、提高擴充套件性、靈活性 & 峰值處理能力、可恢復性(有副本)、順序保證、緩衝、非同步通訊 ## 1.2、kafka術語 - 生產者(Producer): - 向 broker 釋出訊息的應用程式。 - 生產者也負責選擇釋出到Topic上的哪一個分割槽。最簡單的方式從分割槽列表中輪流選擇。也可以根據某種演算法依照權重選擇分割槽。 - 消費者(Consumer): - 從訊息佇列中獲取訊息的客戶端應用程式。 - 一個 topic 可以讓若干個消費者進行消費,若干個消費者組成一個 Consumer Group 即消費組,一條訊息只能被消費組中一個 Consumer 消費。 - 假如所有的消費者都在一個組中,那麼這就變成了 queue 模型。 假如所有的消費者都在不同的組中,那麼就完全變成了釋出-訂閱模型。 - 更通用的,我們可以建立一些消費者組作為邏輯上的訂閱者。每個組包含數目不等的消費者,一個組內多個消費者可以用來擴充套件效能和容錯。如下圖所示: - - 2個 kafka 叢集託管4個分割槽(P0-P3),2個消費者組,消費組 A 有2個消費者例項,消費組 B 有4個。 - kafka 中消費者組有兩個概念:**佇列**:消費者組(consumer group)允許消費者組成員瓜分處理。**釋出訂閱**:允許你廣播訊息給多個消費者組(不同名)。 - 傳統的訊息有兩種模式:佇列和釋出訂閱。 - 在佇列模式中,消費者池從伺服器讀取訊息(每個訊息只被其中一個讀取),優點是允許多個消費者瓜分處理資料,這樣可以擴充套件處理。; - 釋出訂閱模式:訊息廣播給所有的消費者。允許你廣播資料到多個消費者,由於每個訂閱者都訂閱了訊息,所以沒辦法縮放處理。 - broker: - Kafka 例項,多個 broker 組成一個 Kafka 叢集,Kafka 以叢集方式執行,叢集中每個伺服器稱為 broker。 - 通常一臺機器部署一個 Kafka 例項,一個例項掛了不影響其他例項 - 主題(Topic): - 一組訊息的歸納(代表不同的業務,如超市辦會員,付款)。 - 服務端訊息的邏輯儲存單元。一個 topic 通常包含若干個 Partition 分割槽。 - 分割槽(Partition): - 一個 Topic 中的訊息資料按照多個分割槽組織,分割槽是 Kafka 訊息佇列組織的最小單位,一個分割槽可以看作是一個佇列。 - 分散式儲存在各個 broker 中, 實現釋出與訂閱的負載均衡。 - 若干個分割槽可以被若干個 Consumer 同時消費,達到消費者高吞吐量。 - 單個 partition 有序,整體無序,整體有序就將資料都放到一個 partition 中,但是效率極低。 - 每個分割槽有一個 leader,零或多個 follower。Leader 處理此分割槽的所有的讀寫請求,而follower被動的複製資料。如果leader宕機,其它的一個follower會被推舉為新的leader。 - 一臺伺服器可能同時是一個分割槽的 leader,另一個分割槽的 follower。 這樣可以平衡負載,避免所有的請求都只讓一臺或者某幾臺伺服器處理。 - message: - 訊息,或稱日誌訊息,是 Kafka 服務端實際儲存的資料,**每一條訊息都由一個 key、一個 value 以及訊息時間戳 timestamp 組成**。 - offset: - 偏移量,分割槽中的訊息位置,由 Kafka 自身維護,Consumer 消費時也要儲存一份 offset 以維護消費過的訊息位置。 ## 1.3、四個核心 API - Producer API 釋出訊息到1個或多個 topic(主題)中。 - Consumer API 來訂閱一個或多個topic,並處理產生的訊息。 - Streams API 充當一個流處理器,從1個或多個 topic 消費輸入流,並生產一個輸出流到1個或多個輸出 topic,有效地將輸入流轉換到輸出流。 - Connector API 可構建或執行可重用的生產者或消費者,將 topic 連線到現有的應用程式或資料系統。例如,連線到關係資料庫的聯結器可以捕獲表的每個變更。
# 二、kafka 客戶端 ````html ```` ## 2.1、 KafkaProduce ````java import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @author xiandongxie 2020-06-04 */ public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "nn1.hadoop:9092,nn2.hadoop:9092,s1.hadoop:9092"); // 判別請求是否為完整的條件(判斷是不是成功傳送了)。指定了“all”將會阻塞訊息,這種設定效能最低,但是是最可靠的 props.put("acks", "all"); // 如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重複訊息的可能性 props.put("retries", 0); // 生產者快取每個分割槽未傳送的訊息。快取的大小是通過 batch.size 配置指定的。值較大的話將會產生更大的批。並需要更多的記憶體(每個“活躍”的分割槽都有1個緩衝區) props.put("batch.size", 16384); // 預設緩衝可立即傳送,即便緩衝空間還沒有滿,但是,如果想減少請求的數量,可以設定 linger.ms 大於0。 // 這將指示生產者傳送請求之前等待一段時間,希望更多的訊息填補到未滿的批中。這類似於TCP的演算法,例如,可能100條訊息在一個請求傳送,因為我們設定了linger(逗留)時間為1毫秒,然後,如果我們沒有填滿緩衝區,這個設定將增加1毫秒的延遲請求以等待更多的訊息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處於高負載的情況下,如果設定比0大,以少量的延遲代價換取更少的,更有效的請求。 props.put("linger.ms", 1); // 控制生產者可用的快取總量,如果訊息傳送速度比其傳輸到伺服器的快,將會耗盡這個快取空間。當快取空間耗盡,其他傳送呼叫將被阻塞,阻塞時間的閾值通過 max.block.ms 設定,之後它將丟擲一個TimeoutException。 props.put("buffer.memory", 33554432); // key.serializer 和 value.serializer,將使用者提供的 key 和 value 物件 ProducerRecord 轉換成位元組,可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte型別。 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");