1. 程式人生 > >Kafka實現篇之訊息和日誌

Kafka實現篇之訊息和日誌

http://blog.csdn.net/honglei915/article/details/37760631

訊息格式


日誌

一個叫做“my_topic”且有兩個分割槽的的topic,它的日誌有兩個資料夾組成,my_topic_0和my_topic_1,每個資料夾裡放著具體的資料檔案,每個資料檔案都是一系列的日誌實體,每個日誌實體有一個4個位元組的整數N標註訊息的長度,後邊跟著N個位元組的訊息。每個訊息都可以由一個64位的整數offset標註,offset標註了這條訊息在傳送到這個分割槽的訊息流中的起始位置。每個日誌檔案的名稱都是這個檔案第一條日誌的offset.所以第一個日誌檔案的名字就是00000000000.kafka.所以每相鄰的兩個檔名字的差就是一個數字S,S差不多就是配置檔案中指定的日誌檔案的最大容量。

訊息的格式都由一個統一的介面維護,所以訊息可以在producer,broker和consumer之間無縫的傳遞。儲存在硬碟上的訊息格式如下所示:

  1. 訊息長度:     4 bytes (value: 1+4+n)   
  2. 版本號:       1 byte  
  3. CRC校驗碼:    4 bytes  
  4. 具體的訊息:   n bytes  


寫操作

訊息被不斷的追加到最後一個日誌的末尾,當日志的大小達到一個指定的值時就會產生一個新的檔案。對於寫操作有兩個引數,一個規定了訊息的數量達到這個值時必須將資料重新整理到硬碟上,另外一個規定了重新整理到硬碟的時間間隔,這對資料的永續性是個保證,在系統崩潰的時候只會丟失一定數量的訊息或者一個時間段的訊息。

讀操作

讀操作需要兩個引數:一個64位的offset和一個S位元組的最大讀取量。S通常比單個訊息的大小要大,但在一些個別訊息比較大的情況下,S會小於單個訊息的大小。這種情況下讀操作會不斷重試,每次重試都會將讀取量加倍,直到讀取到一個完整的訊息。可以配置單個訊息的最大值,這樣伺服器就會拒絕大小超過這個值的訊息。也可以給客戶端指定一個嘗試讀取的最大上限,避免為了讀到一個完整的訊息而無限次的重試。

在實際執行讀取操縱時,首先需要定位資料所在的日誌檔案,然後根據offset計算出在這個日誌中的offset(前面的的offset是整個分割槽的offset),然後在這個offset的位置進行讀取。定位操作是由二分查詢法完成的,Kafka在記憶體中為每個檔案維護了offset的範圍。

下面是傳送給consumer的結果的格式:

MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes
MultiMessageSetSend (multiFetch result)

total length       : 4 bytes
error code         : 2 bytes
messageSetSend 1
...
messageSetSend n

刪除

日誌管理器允許定製刪除策略。目前的策略是刪除修改時間在N天之前的日誌(按時間刪除),也可以使用另外一個策略:保留最後的N GB資料的策略(按大小刪除)。為了避免在刪除時阻塞讀操作,採用了copy-on-write形式的實現,刪除操作進行時,讀取操作的二分查詢功能實際是在一個靜態的快照副本上進行的,這類似於Java的CopyOnWriteArrayList。

可靠性保證

日誌檔案有一個可配置的引數M,快取超過這個數量的訊息將被強行重新整理到硬碟。一個日誌矯正執行緒將迴圈檢查最新的日誌檔案中的訊息確認每個訊息都是合法的。合法的標準為:所有檔案的大小的和最大的offset小於日誌檔案的大小,並且訊息的CRC32校驗碼與儲存在訊息實體中的校驗碼一致。如果在某個offset發現不合法的訊息,從這個offset到下一個合法的offset之間的內容將被移除。

有兩種情況必須考慮:1,當發生崩潰時有些資料塊未能寫入。2,寫入了一些空白資料塊。第二種情況的原因是,對於每個檔案,作業系統都有一個inode(inode是指在許多“類Unix檔案系統”中的一種資料結構。每個inode儲存了檔案系統中的一個檔案系統物件,包括檔案、目錄、大小、裝置檔案、socket、管道, 等等),但無法保證更新inode和寫入資料的順序,當inode儲存的大小資訊被更新了,但寫入資料時發生了崩潰,就產生了空白資料塊。CRC校驗碼可以檢查這些塊並移除,當然因為崩潰而未寫入的資料塊也就丟失了。

相關推薦

Kafka實現訊息日誌

http://blog.csdn.net/honglei915/article/details/37760631訊息格式日誌一個叫做“my_topic”且有兩個分割槽的的topic,它的日誌有兩個資料夾組成,my_topic_0和my_topic_1,每個資料夾裡放著具體的資

漫遊Kafka實現訊息日誌

訊息格式 日誌 一個叫做“my_topic”且有兩個分割槽的的topic,它的日誌有兩個資料夾組成,my_topic_0和my_topic_1,每個資料夾裡放著具體的資料檔案,每個資料檔案都是一系列的日誌實體,每個日誌實體有一個4個位元組的整數N標註訊息的長度,後邊

kafka系列-入門訊息offset儲存

前言 Kafka具有儲存功能,預設儲存資料時間為7天或者大小1G,也就是說kafka broker上的資料超7天或者1G,就會被清理掉。這些資料存放在broker伺服器上,以log檔案的形式存在。 準備工作 topic 我添加了一個topic名字為dem

SV元件實現六:比較器參考模型

本文轉自:http://www.eetop.cn/blog/html/28/1561828-2316831.html 在同之前的verifier梅、尤、董完成了slave、arbiter和registers的模組驗證之後,我們需要看看最後一位verifier婁是如何完成arbiter驗證的。Ve

Python網絡編程selectepoll

unix cat 必須 inpu 結束 新的 eno {} 提升 1. select 原理 在多路復?的模型中, ?較常?的有select模型和epoll模型。 這兩個都是系統接?, 由操作系統提供。 當然, Python的select模塊進?了更?級的封裝。 ?絡通信被U

iOS總結-多執行緒NSOperationNSOperationQueue

參考:https://www.jianshu.com/p/4b1d77054b35 NSOperation/NSOperationQueue是基於GCD更高一層的封裝,完全面向物件。 優點:1.可新增完成的程式碼塊,在操作完成後執行        

SV元件實現七(終):測試環境的報告規範

本文轉自:http://www.eetop.cn/blog/html/28/1561828-2316832.html 文章結構: 通過一種標準化的方式列印資訊 過濾(重要級別)資訊 列印通道 在之前的介紹中,讀者從四位verifier的驗證元件實現中懂得了通過類和封

OC學習--- property synthesize的使用

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

SV元件實現四:監測器的取樣

當verifier梅在實現slave channel驗證環境的時候,她也繪製了一幅slave channel驗證結構圖: verifier梅借鑑了verifier董在驗證模組registers時的方法,為DUT slave channel建立了2個stimulato

資料結構學習佇列

棧和佇列是什麼 棧和佇列是兩種特殊的線性表,它們是限定只能在表的一端或兩端進行插入、刪除元素的線性表,因此,統稱為限定性資料結構。 共同點:   都是隻能線上性表的端點插入和刪除。不同點:  棧的插入和刪除都線上性表的同一個端點,該點通稱棧頂,相應地,不能插入刪除的另一個端點通稱棧底,其特性是後進先出。

C#基礎C# .Net框架的概念執行原理

 引言:要使用一樣東西,最起碼得對他有個最基本的瞭解,所以,熟悉一下概念和流程還是非常有必要的,下面一起來看看吧!一、微軟平臺的發展史二、.Net框架包含的東西1、名詞解釋  BCL:基類庫(Base Class Library)系統和底層提供的最基本的類庫  CLR:公共語

Leetcode題解中級陣列字串(8)奇偶連結串列

題目:https://leetcode-cn.com/explore/interview/card/top-interview-questions-medium/31/linked-list/83/ 題目描述: 奇偶連結串列 給定一個單鏈表,把所有的奇數節點和偶數節點分別排在一起。請注意

Leetcode題解中級陣列字串(6)遞增的三元子序列

題目:https://leetcode-cn.com/explore/interview/card/top-interview-questions-medium/29/array-and-strings/80/ 題目描述: 給定一個未排序的陣列,判斷這個陣列中是否存在長度為 3 的遞增子序列

Leetcode題解中級陣列字串(5)最長迴文子串

題目:https://leetcode-cn.com/explore/interview/card/top-interview-questions-medium/29/array-and-strings/79/ 題目描述: 給定一個字串 s,找到 s 中最長的迴

Leetcode題解中級陣列字串(4)無重複字元的最長子串

題目:https://leetcode-cn.com/explore/interview/card/top-interview-questions-medium/29/array-and-strings/78/ 題目描述: 給定一個字串,請你找出其中不含有重複字元的 最長子串&nb

spirngboot 配置kafka實現group訂閱訊息

本人所使用的kafka版本是kafka_2.11-2.0.1版本,jdk1.8、zookeeper-3.3.6,kafka運行於JVM環境,依賴zookeeper註冊中心,所以需要準備這三個軟體,本文測試使用的是windows版本的kafka和zookeeper便於測試 環境搭建 步驟 1、

Android面試ArrayListLinkedList的區別

● 資料結構 ArrayList基於動態陣列;LinkedList基於連結串列 ● 隨機訪問 ArrayList優於LinkedList,因為LinkedList要移動指標來查詢,下面以get方法為例

漫遊Kafka入門簡單介紹

釋出訊息通常有兩種模式:佇列模式(queuing)和釋出-訂閱模式(publish-subscribe)。佇列模式中,consumers可以同時從服務端讀取訊息,每個訊息只被其中一個consumer讀到;釋出-訂閱模式中訊息被廣播到所有的consumer中。 Consumers可以加入一個consumer

漫遊Kafka實戰客戶端程式設計例項

Kafka Producer APIs 新版的Producer API提供了以下功能:可以將多個訊息快取到本地佇列裡,然後非同步的批量傳送到broker,可以通過引數producer.type=async做到。快取的大小可以通過一些引數指定:queue.time和

Python爬蟲番外CookieSession

出現 alt 登錄 這也 nan b站 程序員 分別是 註釋 原文地址https://i.cnblogs.com/EditPosts.aspx?opt=1 關於cookie和session估計很多程序員面試的時候都會被問到,這兩個概念在寫web以及爬蟲中都會涉及,並且兩者可