1. 程式人生 > >《KAFKA官方文件》設計與實現

《KAFKA官方文件》設計與實現

原文連結

5.設計與實現(IMPLEMENTATION)

5.1 API 設計

生產者 APIS

生產者API包含2個producers-kafka.producer.SyncProducer
kafka.producer.async.AsyncProducer。示例程式碼如下:

class Producer {

/* Sends the data, partitioned by key to the topic using either the */
/* synchronous or the asynchronous producer */
/*使用同步或者非同步的生產者,傳送單條訊息至由key所對應的topic的分割槽*/
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);

/* Sends a list of data, partitioned by key to the topic using either */
/* the synchronous or the asynchronous producer */
/*使用同步或者非同步的生產者,傳送一系列資料至由key所對應的topic的分割槽*/
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);

/* Closes the producer and cleans up */
/*關閉生產者並做相應清理*/
public void close();

}

其目的就是通過一個單一的API向客戶端暴露所有的生產者功能。kafka生產者

  • 可以處理多個生產者的排隊以及緩衝請求以及非同步地分發批量的資料:
    kafka.producer.Producer對於多個生產者的請求資料(producer.type=async),在序列化和分發它們至相應的kafka節點分割槽之前,其有能力對它們進行批量處理。而批量處理的大小可由少量的配置引數完成。當資料進入至佇列,它們將被緩衝在佇列裡面,直到queue.time超時或者達到了配置(batch.size)的批量處理的最大值.後臺的非同步執行緒(kafka.producer.async.ProducerSendThread
    )負責將佇列裡的資料批量取出並讓kafka.producer.EventHandler進行序列化工作,且將資料傳送至kafka相應的節點分割槽。通過設定event.handler配置引數,即可實現一個自定義的事件處理器(event handler)。不論對於植入自定義日誌/跟蹤程式碼,還是自定義監控邏輯,能在生產者佇列管道的不同階段注入回撥函式是極其有幫助的。一種可能的方案是通過實現kafka.producer.async.CallbackHandler介面並且對該類設定callback.handler配置引數。
  • 通過使用者自定義的Encoder實現對資料的序列化操作:
interface Encoder<T> {

public Message toMessage(T data);

}

預設的Encoder``是kafka.serializer.DefaultEncoder“`

  • 通過使用者設定(可選)的Partitioner提供基於軟體層面的負載均衡(slb):
    kafka.producer.Partitioner會影響到資料傳輸時的路由策略。
interface Partitioner<T> {

int partition(T key, int numPartitions);

}

分割槽API使用key以及可用節點分割槽來返回一個分割槽id。這個id通常用作有序broker_ids的索引,同時節點分割槽(partitions)將會用這個id挑選出一個分割槽去處理生產者的請求。預設的分割槽策略是是對key進行hash,並對分割槽數目取餘,即hash(key)%numPartitions。如果key為null,那麼將會挑選出一個隨機的節點。如果想要實現自定義的分割槽策略,也可以通過設定partitioner.class配置引數實現。

消費者 APIS

kafka提供兩種級別的消費者APIS。對於普通、簡單的消費者API,其僅包含對單個節點的連線,且可關閉傳送給server網路請求。這個API是完全無狀態的,每個網路請求將攜帶偏移量,使用者可以根據自己的選擇是否保留這些元資料。

高階的消費者API不僅隱藏了kafka叢集的細節,而且可以消費叢集中的任意一臺機器而不用關心其背後的網路拓撲。同時,它也保留了訊息是否被消費的狀態。另外,高級別的消費者API還支援對依據過濾表示式來對訂閱的topic進行過濾(譬如白名單或者黑名單等類似的正則表示式)

普通的 API

class SimpleConsumer {

/*向節點發出拉取訊息的請求,並返回訊息的資料集*/
public ByteBufferMessageSet fetch(FetchRequest request);

/*傳送批量拉取訊息的請求,返回響應資料集*/
public MultiFetchResponse multifetch(List<FetchRequest> fetches);

/**
*在給定時間前返回有效的偏移量(分割槽容量最大值)資料集,且為降序排列
*
* @param time: 毫秒,
* 如果設定了 OffsetRequest$.MODULE$.LATEST_TIME(),則可以從最新的偏移量獲取訊息
* 如果設定了 OffsetRequest$.MODULE$.EARLIEST_TIME(), 則可以從最早的偏移獲取訊息.
*/
public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

普通消費者API通常用於實現高階API,以及用於一些離線消費者,這些消費者對於保持狀態有特殊的要求

高階 API

/*建立一個對kafka叢集的連線*/
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {

/**
* 此方法用於獲取KafkaStreams的集合,這個集合是
* MessageAndMetadata物件的迭代器,通過這個物件你可以獲取到與元資料(目前僅指topic)相關聯的訊息
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);

/**
* 你可以獲取KafkaStreams的集合, 對符合TopicFilter過濾後topic訊息進行迭代(TopicFilter是用標準Java正則表示式封裝的topic白名單或黑名單)
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);

/* 提交截止至目前所有的已消費的訊息的偏移量 */
public commitOffsets()

/* 關閉連線 */
public shutdown()
}

這個API圍繞迭代器並由KafkaStream類實現。每個kafkastream表示從一個或多個伺服器的一個或多個分割槽的資訊流。每個流用於單執行緒處理,所以客戶端可以在建立呼叫中提供所需的流數。因此,流可能代表多個伺服器分割槽的合併(對應於處理執行緒的數量),但每個分割槽只會流向一個流。

createMessageStreams方法呼叫已在某個topic註冊的consumer,這將導致消費者/kafka節點分配的再平衡。API鼓勵在一個呼叫中建立多個主題流,以最小化這種重新平衡。createMessageStreamsByFilter方法呼叫(額外的)註冊的watcher去發現新的符合被過濾的topic。注意通過createMessageStreamsByFilter方法返回的每個流可能會迭代多個topic的訊息(譬如,過濾器中允許多個topic)

5.2 網路層

Kafka網路層是一個相當簡單的NIO伺服器,這個將不會進行詳細的闡述。sendfile的實現是由MessageSet介面和writeTo方法完成。這使得備份檔案的資訊集合,使用更有效的transferTo實現而不是中間緩衝寫。執行緒模型是一個單執行緒和用來處理每個固定連線數的N個處理器執行緒組成。這種設計已經在其他地方進行了充分的測試,並且被公認為是簡單和快速的實現。該協議保持相當簡潔的形式,以便將來更多其他型別語言的客戶端實現。

5.3 訊息

訊息由固定大小的head、可變長度的不透明金鑰鍵位元組陣列和可變長度的不透明值位元組陣列組成.訊息頭包含如下的一些欄位:
– CRC32 用以檢測訊息的擷取和損壞
– 格式版本
– 鑑別器的一個屬性
– 時間戳

使鍵和值保持不透明是一個正確的確定:現在序列化包有很大的進展,任何特定的選擇都不適合所有的使用.更不用說,一個特定的應用程式使用卡夫卡可能會指定一個特定的序列化型別作為其使用的一部分。MessageSet介面僅僅只是一個迭代器,用於迭代方法產生的訊息,這個方法對NiO通道進行批量讀取和寫入。

5.4 訊息格式

/**
* 1. 訊息的4位元組CRC32
* 2. 一個位元組的 identifier ,用以格式的變化,變化的值為0 或者1
* 3. 一個位元組的 identifier屬性,允許訊息的註釋與版本無關
* 位 0 ~ 2 : 壓縮編解碼
* 0 : 無壓縮
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : 時間戳型別
* 0 : 建立時間
* 1 : 日誌追加時間
* bit 4 ~ 7 : 保留位
* 4. (可選的) 8位元組時間戳只有當“magic”識別符號大於0
* 5. 4位元組金鑰長度,包含長度k
* 6. K 位元組的 key
* 7. 4位元組有效負載長度,含長度v
* 8. V位元組的有效負載
*/

5.5 日誌

topic名字為”my_topic”的日誌有兩個分割槽,並且包含兩個目錄(也就是my_topic_0my_topic_1),目錄裡面包含的是該topic的訊息資料檔案。日誌檔案的格式是“日誌條目”序列;每個日誌條目是一個4位元組整數n儲存的訊息長度,其次是N訊息位元組.每個訊息唯一標識是一個64位整數的偏移量,該偏移量由傳送到該主題的該分割槽的所有訊息流中的訊息的開始的位元組位置給出。每個訊息的磁碟格式如下。每個日誌檔案以其包含的第一條訊息的偏移量命名。所以第一個建立的檔案將是00000000000.kafka,每個附加檔案將有一個大致S位元組整數的名字,S是指之前在配置中設定的日誌檔案最大值

確切的二進位制訊息格式是版本化的並且是以標準的介面進行維護,因此,當進行容錯時,訊息集可以在生產者,kafka節點,以及客戶端之間進行相互的任意轉換,而不用複製和調節。改格式如下:

磁碟上訊息的格式

offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes

使用訊息偏移作為訊息ID是極其少見的。我們最初的想法是使用一個由生產者自帶的GUID自增長實現,並且儲存從GUID到每一個節點上偏移量的對映。但由於每個消費者必須持有每個server的ID,因此GUID的全域性唯一性將無法提供。更重要的是持有隨機id至分割槽偏移量
的複雜度需要很重的索引結構,而且必須要與磁碟同步,這在本質上就需要一種完全的持久化的隨機存取資料結構。因此,為了簡化查詢結構,我們決定使用一個簡單的針對每個分割槽的原子計數器,它可以用分割槽ID和節點ID唯一標識訊息;這使得查詢結構更簡單,儘管多每個消費者的多個請求仍然是可能的。然而,一旦我們解決了計數器的問題,跳轉至直接使用偏移量似乎變得順理成章了,畢竟對於任意一個分割槽而言,計數器是單調唯一增長的。由於偏移量的實現對於消費者是不可見的,因此最終可以採取一種更為高效的方式具體實現(此處句子讀得不太明白)。

Kafka Log Implementation

寫(Write)

日誌允許序列追加,通常是追加到最後一個檔案,當達到配置的最大值時,該檔案會在另一個新的檔案上繼續追加。日誌檔案有兩個配置引數:m是作業系統將檔案重新整理到磁碟之前,可寫入的訊息的數目,s是指強制多少秒執行一次重新整理操作。這給出了一個永續性保障,在系統崩潰時,至多丟失M條訊息或S秒的資料。

讀(Read)

訊息的讀取是依據訊息在分割槽中的64位邏輯偏移量以及S位元組的最大讀取值完成。這個將返回在S位元組緩衝區中的訊息迭代器。S值的配置應比任何訊息的長度要大,但當訊息的長度異常地大時,訊息的讀取將被重試多次,每一次重試都會擴大緩衝區一倍,直到訊息被成功讀取。訊息的最大值緩衝區的最大值均可設定,如此伺服器便可以拒絕處理比它們要大的訊息,並提供客戶端上要讀取完整訊息所需的最大值的繫結。已分割槽訊息為結尾去讀取緩衝區是有可能的,大小的分割將會被很容易地檢測到。

從偏移量讀取訊息的實際過程需要首先定位儲存資料的日誌段檔案,從全域性偏移量計算檔案特定的偏移量,然後從該檔案偏移量讀取。這是一個簡單的對每個檔案保持在記憶體範圍內變化的二分查詢

該日誌提供了獲取最近寫入訊息的能力,允許客戶端“實時”訂閱。這在消費者未能消費指定天數訊息的場景下尤為有用。在這種情況下,消費者試圖去消費一個由OutOfRangeException引起的不存在的偏移量,要麼採取重置自身的方式,要麼消費失敗並作為消費失敗的案例(有些許問題)

如下是傳送給消費者的結果的格式

MessageSetSend (fetch result)

total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)

total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n

刪除(Delete)

資料刪除是一次刪除一個日誌段。日誌管理器允許可插入的刪除策略來選擇哪些檔案適合刪除。當前的策略是刪除任何超過 N天前修改時間的日誌,雖然保留最後N GB的政策也可以是有用的。為了避免讀取時鎖定,而仍然允許修改段列表的刪除,我們使用一個 copy-on-write樣式分段列表實現,它提供一致的檢視,允許以二分查詢的方式讀取日誌段的不變的靜態快照檢視,且刪除操作也同步進行。

保證(Guarantees)

kafka日誌提供了一個可配置的引數M,該引數可以控制在訊息被強制寫入到磁碟前訊息的最大數量。在kafka服務啟動時,一個日誌恢復執行緒會伴隨著啟動,它迭代所有最新分段日誌的訊息,並且校驗所有訊息項的有效性。如果訊息的大小和偏移量的總和不超過該檔案的長度並且訊息有效負載的CRC32匹配訊息儲存有CRC資訊,那麼該訊息項就是有效的。在檢測到崩潰事件時,日誌將被截斷為最後有效的偏移量。

請注意,兩種型別的崩潰必須處理:由於崩潰導致的寫塊丟失損壞,以及無意義區塊被追加到檔案的損壞。這樣做的原因是,在一般的作業系統不保證在檔案的節點之間的和實際的資料塊的寫入順序,因此除了失去已寫入的資料,假如節點正在更新值,但在資料寫入之前發生崩潰了,那麼該檔案獲取到的都是無意義的資料。CRC就是檢測這樣的邊緣案例,並防止它損壞日誌(當然,未寫入的訊息會丟失)