1. 程式人生 > >差勁的程式設計師操心程式碼,優秀的程式設計師操心資料結構和它們之間的關係! ------ Linus Torvalds, Linux創始人.

差勁的程式設計師操心程式碼,優秀的程式設計師操心資料結構和它們之間的關係! ------ Linus Torvalds, Linux創始人.

Kafka log的儲存解析:

Partition中的每條Message由offset來表示它在這個partition中的偏移量,這個offset不是該Message在partition資料檔案中的實際儲存位置,而是邏輯上一個值,它唯一確定了partition中的一條Message。因此,可以認為offset是partition中Message的id。partition中的每條Message包含了以下三個屬性:

offset

MessageSize

data

其中offset為long型,MessageSize為int32,表示data有多大,data為message的具體內容。

我們來思考一下,如果一個partition只有一個數據檔案會怎麼樣?

1) 新資料是新增在檔案末尾,不論檔案資料檔案有多大,這個操作永遠都是高效的。

2) 查詢某個offset的Message是順序查詢的。因此,如果資料檔案很大的話,查詢的效率就低。

那Kafka是如何解決查詢效率的的問題呢?有兩大法寶:1) 分段 2) 索引。

  • 資料檔案的分段

Kafka解決查詢效率的手段之一是將資料檔案分段,比如有100條Message,它們的offset是從0到99。假設將資料檔案分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的資料檔案裡面,資料檔案以該段中最小的offset命名。這樣在查詢指定offset的Message的時候,用二分查詢就可以定位到該Message

在哪個段中

  • 為資料檔案建索引

資料檔案分段使得可以在一個較小的資料檔案中查詢對應offset的Message了,但是這依然需要順序掃描才能找到對應offset的Message。為了進一步提高查詢的效率,Kafka為每個分段後的資料檔案建立了索引檔案,檔名與資料檔案的名字是一樣的,只是副檔名為.index。

索引檔案中包含若干個索引條目,每個條目表示資料檔案中一條Message的索引。

索引包含兩個部分,分別為相對offset和position

(1)相對offset:因為資料檔案分段以後,每個資料檔案的起始offset不為0,相對offset表示這條Message相對於其所屬資料檔案中最小的offset

的大小。舉例,分段後的一個數據檔案的offset是從20開始,那麼offset為25的Message在index檔案中的相對offset就是25-20 = 5。儲存相對offset可以減小索引檔案佔用的空間。

(2)position,表示該條Message在資料檔案中的絕對位置。只要開啟檔案並移動檔案指標到這個position就可以讀取對應的Message了。

 index檔案中並沒有為資料檔案中的每條Message建立索引,而是採用了稀疏儲存的方式,每隔一定位元組的資料建立一條索引。這樣避免了索引檔案佔用過多的空間,從而可以將索引檔案保留在記憶體中。但缺點是沒有建立索引的Message也不能一次定位到其在資料檔案的位置,從而需要做一次順序掃描,但是這次順序掃描的範圍就很小了。

我們以幾張圖來總結一下Message是如何在Kafka中儲存的,以及如何查詢指定offset的Message的。

Message是按照topic來組織,每個topic可以分成多個的partition,比如:有5個partition的名為為page_visits的topic的目錄結構為:

 

partition是分段的,每個段叫Segment,包括了一個數據檔案和一個索引檔案,下圖是某個partition目錄下的檔案:

 

可以看到,這個partition有4個Segment。

接下來我們就演示一下Kafka是如何查詢資料的:比如,要查詢絕對offset為60395的message.

首先是用二分查詢確定它是在哪個LogSegment中,自然是在第一個Segment中。

offset: 60393 position: 4206936 CreateTime: 1537675707587 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 377199344 keysize: 13 key: �X   numq . payload: �X
   工單:49636 q .
offset: 60394 position: 4207006 CreateTime: 1537675707589 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3955093515 keysize: 13 key: �X   numq . payload: �X
   工單:49639 q .
offset: 60395 position: 4207076 CreateTime: 1537675707590 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3198390071 keysize: 13 key: �X   numq . payload: �X
   工單:49642 q .
offset: 60396 position: 4207146 CreateTime: 1537675707591 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2937420651 keysize: 13 key: �X   numq . payload: �X
   工單:49645 q .
offset: 60397 position: 4207216 CreateTime: 1537675707592 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2539862811 keysize: 13 key: �X   numq . payload: �X
   工單:49648 q .

開啟這個Segment的index檔案,也是用二分查詢找到offset小於或者等於指定offset的索引條目中最大的那個offset

很明顯,絕對offset為60395在index中的相對index也為60395-0=60395(當然,我們希望含有這個index條目).

offset: 60005 position: 4179776
offset: 60064 position: 4183906
offset: 60123 position: 4188036
offset: 60182 position: 4192166
offset: 60242 position: 4196366
offset: 60302 position: 4200566
offset: 60361 position: 4204696
offset: 60420 position: 4208826
offset: 60480 position: 4213026
offset: 60539 position: 4217156

因為不含有60395,我們只能找到60361,所以相對offset為60361的那個索引是我們要找的,相對offset為60361的Message在資料檔案中的絕對位置為4204696。

開啟資料檔案,從位置為4204696的那個地方開始順序掃描直到找到offset為60395的那條Message。

offset: 60358 position: 4204486 CreateTime: 1537675707545 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3421926773 keysize: 13 key: �X   numq . payload: �X
   工單:49531 q .
offset: 60359 position: 4204556 CreateTime: 1537675707546 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1021150352 keysize: 13 key: �X   numq . payload: �X
   工單:49534 q .
offset: 60360 position: 4204626 CreateTime: 1537675707547 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3639334412 keysize: 13 key: �X   numq . payload: �X
   工單:49537 q .
offset: 60361 position: 4204696 CreateTime: 1537675707548 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2882465730 keysize: 13 key: �X   numq . payload: �X
   工單:49540 q .
offset: 60362 position: 4204766 CreateTime: 1537675707550 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1882440651 keysize: 13 key: �X   numq . payload: �X
   工單:49543 q .
offset: 60363 position: 4204836 CreateTime: 1537675707551 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 457532663 keysize: 13 key: �X   numq . payload: �X
   工單:49546 q .
offset: 60364 position: 4204906 CreateTime: 1537675707553 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2955983481 keysize: 13 key: �X   numq . payload: �X
   工單:49549 q .
offset: 60365 position: 4204976 CreateTime: 1537675707553 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 213670093 keysize: 13 key: �X   numq . payload: �X
   工單:49552 q .
offset: 60366 position: 4205046 CreateTime: 1537675707554 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2170026568 keysize: 13 key: �X   numq . payload: �X
   工單:49555 q .
offset: 60367 position: 4205116 CreateTime: 1537675707555 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3662747317 keysize: 13 key: �X   numq . payload: �X
   工單:49558 q .
offset: 60368 position: 4205186 CreateTime: 1537675707557 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 776207168 keysize: 13 key: �X   numq . payload: �X
   工單:49561 q .
offset: 60369 position: 4205256 CreateTime: 1537675707559 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2063475433 keysize: 13 key: �X   numq . payload: �X
   工單:49564 q .
offset: 60370 position: 4205326 CreateTime: 1537675707560 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 4256750840 keysize: 13 key: �X   numq . payload: �X
   工單:49567 q .
offset: 60371 position: 4205396 CreateTime: 1537675707561 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 659749633 keysize: 13 key: �X   numq . payload: �X
   工單:49570 q .
offset: 60372 position: 4205466 CreateTime: 1537675707563 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 4239382280 keysize: 13 key: �X   numq . payload: �X
   工單:49573 q .
offset: 60373 position: 4205536 CreateTime: 1537675707564 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3897527582 keysize: 13 key: �X   numq . payload: �X
   工單:49576 q .
offset: 60374 position: 4205606 CreateTime: 1537675707565 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3381211779 keysize: 13 key: �X   numq . payload: �X
   工單:49579 q .
offset: 60375 position: 4205676 CreateTime: 1537675707566 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1891946450 keysize: 13 key: �X   numq . payload: �X
   工單:49582 q .
offset: 60376 position: 4205746 CreateTime: 1537675707567 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1634876302 keysize: 13 key: �X   numq . payload: �X
   工單:49585 q .
offset: 60377 position: 4205816 CreateTime: 1537675707568 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1623775408 keysize: 13 key: �X   numq . payload: �X
   工單:49588 q .
offset: 60378 position: 4205886 CreateTime: 1537675707569 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 85082408 keysize: 13 key: �X   numq . payload: �X
   工單:49591 q .
offset: 60379 position: 4205956 CreateTime: 1537675707570 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 4063867085 keysize: 13 key: �X   numq . payload: �X
   工單:49594 q .
offset: 60380 position: 4206026 CreateTime: 1537675707570 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3046754845 keysize: 13 key: �X   numq . payload: �X
   工單:49597 q .
offset: 60381 position: 4206096 CreateTime: 1537675707572 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1858642034 keysize: 13 key: �X   numq . payload: �X
   工單:49600 q .
offset: 60382 position: 4206166 CreateTime: 1537675707573 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2331964142 keysize: 13 key: �X   numq . payload: �X
   工單:49603 q .
offset: 60383 position: 4206236 CreateTime: 1537675707574 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2111111947 keysize: 13 key: �X   numq . payload: �X
   工單:49606 q .
offset: 60384 position: 4206306 CreateTime: 1537675707575 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1544797334 keysize: 13 key: �X   numq . payload: �X
   工單:49609 q .
offset: 60385 position: 4206376 CreateTime: 1537675707576 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 544856803 keysize: 13 key: �X   numq . payload: �X
   工單:49612 q .
offset: 60386 position: 4206446 CreateTime: 1537675707577 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 835661503 keysize: 13 key: �X   numq . payload: �X
   工單:49615 q .
offset: 60387 position: 4206516 CreateTime: 1537675707578 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 4136932507 keysize: 13 key: �X   numq . payload: �X
   工單:49618 q .
offset: 60388 position: 4206586 CreateTime: 1537675707580 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 42367342 keysize: 13 key: �X   numq . payload: �X
   工單:49621 q .
offset: 60389 position: 4206656 CreateTime: 1537675707581 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 1777393234 keysize: 13 key: �X   numq . payload: �X
   工單:49624 q .
offset: 60390 position: 4206726 CreateTime: 1537675707583 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2987258459 keysize: 13 key: �X   numq . payload: �X
   工單:49627 q .
offset: 60391 position: 4206796 CreateTime: 1537675707584 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2800751045 keysize: 13 key: �X   numq . payload: �X
   工單:49630 q .
offset: 60392 position: 4206866 CreateTime: 1537675707586 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2098019788 keysize: 13 key: �X   numq . payload: �X
   工單:49633 q .
offset: 60393 position: 4206936 CreateTime: 1537675707587 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 377199344 keysize: 13 key: �X   numq . payload: �X
   工單:49636 q .
offset: 60394 position: 4207006 CreateTime: 1537675707589 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3955093515 keysize: 13 key: �X   numq . payload: �X
   工單:49639 q .
offset: 60395 position: 4207076 CreateTime: 1537675707590 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 3198390071 keysize: 13 key: �X   numq . payload: �X
   工單:49642 q .
offset: 60396 position: 4207146 CreateTime: 1537675707591 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2937420651 keysize: 13 key: �X   numq . payload: �X
   工單:49645 q .
offset: 60397 position: 4207216 CreateTime: 1537675707592 isvalid: true payloadsize: 23 magic: 1 compresscodec: NoCompressionCodec crc: 2539862811 keysize: 13 key: �X   numq . payload: �X
   工單:49648 q .

這套機制是建立在offset是有序的。索引檔案被對映到記憶體中,所以查詢的速度還是很快的。

一句話,Kafka的Message儲存採用了分割槽(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性。