Druid高效架構
我們知道Druid能夠同時提供對大資料集的實時攝入和高效複雜查詢的效能,主要原因就是它獨到的架構設計和基於Datasource與Segment的資料儲存結構。接下來我們會分別從資料儲存和系統節點架構兩方面來深入瞭解一下Druid的架構。
資料儲存
Druid將資料組織成Read-Optimized的結構,而這也是Druid能夠支援互動式查詢的關鍵。Druid中的資料儲存在被稱為datasource中,類似RDMS中的table。每個datasource按照時間劃分,如果你有需求也可以進一步按其它屬性劃分。每個時間範圍稱為一個chunk(比如你按天分割槽,則一個chunk為一天)。在chunk中資料由被分為一個或多個segment(segment是資料實際儲存結構,Datasource、Chunk只是一個邏輯概念),每個segment都是一個單獨的檔案,通常包含幾百萬行資料,這些segment是按照時間組織成的chunk,所以在按照時間查詢資料時,效率非常高。

Datasource
資料分割槽
任何分散式儲存/計算系統,都需要對資料進行合理的分割槽,從而實現儲存和計算的均衡,以及資料並行化。而Druid本身處理的是事件資料,每條資料都會帶有一個時間戳,所以很自然的就可以使用時間進行分割槽。比如上圖,我們指定了分割槽粒度為為天,那麼每天的資料都會被單獨儲存和查詢(一個分割槽下有多個Segment的原因往下看)。
使用時間分割槽我們很容易會想到一個問題,就是很可能每個時間段的資料量是不均衡的(想一想我們的業務場景),而Duid為了解決這種問題,提供了“二級分割槽”,每一個二級分割槽稱為一個Shard(這才是物理分割槽)。通過設定每個Shard的所能儲存的目標值和Shard策略,來完成shard的分割槽。Druid目前支援兩種Shard策略:Hash(基於維值的Hash)和Range(基於某個維度的取值範圍)。上圖中,2000-01-01和2000-01-03的每個分割槽都是一個Shard,由於2000-01-02的資料量比較多,所以有兩個Shard。
Segment
Shard經過持久化之後就稱為了Segment,Segment是資料儲存、複製、均衡(Historical的負載均衡)和計算的基本單元了。Segment具有不可變性,一個Segment一旦建立完成後(MiddleManager節點發布後)就無法被修改,只能通過生成一個新的Segment來代替舊版本的Segment。
Segment內部儲存結構
接下來我們可以看下Segment檔案的內部儲存結構。因為Druid採用列式儲存,所以每列資料都是在獨立的結構中儲存(並不是獨立的檔案,是獨立的資料結構,因為所有列都會儲存在一個檔案中)。Segment中的資料型別主要分為三種:時間戳、維度列和指標列。

Segment資料列
對於時間戳列和指標列,實際儲存是一個數組,Druid採用LZ4壓縮每列的整數或浮點數。當收到查詢請求後,會拉出所需的行資料(對於不需要的列不會拉出來),並且對其進行解壓縮。解壓縮完之後,在應用具體的聚合函式。
對於維度列不會像指標列和時間戳這麼簡單,因為它需要支援filter和group by,所以Druid使用了字典編碼(Dictionary Encoding)和點陣圖索引(Bitmap Index)來儲存每個維度列。每個維度列需要三個資料結構:
- 需要一個字典資料結構,將維值(維度列值都會被認為是字串型別)對映成一個整數ID。
- 使用上面的字典編碼,將該列所有維值放在一個列表中。
- 對於列中不同的值,使用bitmap資料結構標識哪些行包含這些值。
Druid針對維度列之所以使用這三個資料結構,是因為:
- 使用字典將字串對映成整數ID,可以緊湊的表示結構2和結構3中的值。
- 使用Bitmap點陣圖索引可以執行快速過濾操作(找到符合條件的行號,以減少讀取的資料量),因為Bitmap可以快速執行AND和OR操作。
- 對於group by和TopN操作需要使用結構2中的列值列表。
我們以上面"Page"維度列為例,可以具體看下Druid是如何使用這三種資料結構儲存維度列:
1. 使用字典將列值對映為整數 { "Justin Bieher":0, "ke$ha":1 } 2. 使用1中的編碼,將列值放到一個列表中 [0,0,1,1] 3. 使用bitmap來標識不同列值 value = 0: [1,1,0,0] //1代表該行含有該值,0標識不含有 value = 1: [0,0,1,1]
下圖是以advertiser列為例,描述了advertiser列的實際儲存結構:

advertiser列值儲存
前兩種儲存結構在最壞情況下會根據資料量增長而成線性增長(列資料中的每行都不相同),而第三種由於使用Bitmap儲存(本身是一個稀疏矩陣),所以對它進行壓縮,可以得到非常客觀的壓縮比。Druid而且運用了Roaring Bitmap( ofollow,noindex">http://roaringbitmap.org/ )能夠對壓縮後的點陣圖直接進行布林運算,可以大大提高查詢效率和儲存效率(不需要解壓縮)。
Segment命名
高效的資料查詢,不僅僅體現在檔案內容的儲存結構上,還有一點很重要,就是檔案的命名上。試想一下,如果一個Datasource下有幾百萬個Segment檔案,我們又如何快速找出我們所需要的檔案呢?答案就是通過檔名稱快速索引查詢。
Segment的命名包含四部分:資料來源(Datasource)、時間間隔(包含開始時間和結束時間兩部分)、版本號和分割槽(Segment有分片的情況下才會有)。
test-datasource_2018-05-21T16:00:00.000Z_2018-05-21T17:00:00.000Z_2018-05-21T16:00:00.000Z_1 資料來源名稱_開始時間_結束時間_版本號_分割槽
分片號是從0開始,如果分割槽號為0,則可以省略:test-datasource_2018-05-21T16:00:00.000Z_2018-05-21T17:00:00.000Z_2018-05-21T16:00:00.000Z
還需要注意如果一個時間間隔segment由多個分片組成,則在查詢該segment的時候,需要等到所有分片都被載入完成後,才能夠查詢(除非使用線性分片規範(linear shard spec),允許在未載入完成時查詢)。
欄位 | 是否必須 | 描述 |
---|---|---|
datasource | 是 | segment所在的Datasource |
開始時間 | 是 | 該Segment所儲存最早的資料,時間格式是ISO 8601。開始時間和結束時間是通過segmentGranularity設定的時間間隔 |
結束時間 | 是 | 該segment所儲存最晚的資料,時間格式是ISO 8601 |
版本號 | 是 | 因為Druid支援批量覆蓋操作,當批量攝入與之前相同資料來源、相同時間間隔資料時,資料就會被覆蓋,這時候版本號就會被更新。Druid系統的其它部分感知到這個訊號後,就會把就舊資料刪除,使用新版本的資料(這個切換很快)。版本號也是是用的ISO 8601時間戳,但是這個時間戳代表首次啟動的時間 |
分割槽號 | 否 | segment如果採用分割槽,才會有該標識 |
Segment物理儲存例項
下面我們以一個例項來看下Segment到底以什麼形式儲存的,我們以本地匯入方式將下面資料匯入到Druid中。
{"time": "2018-11-01T00:47:29.913Z","city": "beijing","sex": "man","gmv": 20000} {"time": "2018-11-01T00:47:33.004Z","city": "beijing","sex": "woman","gmv": 50000} {"time": "2018-11-01T00:50:33.004Z","city": "shanghai","sex": "man","gmv": 10000}
我們以單機形式執行Druid,這樣Druid生成的Segment檔案都在${DRUID_HOME}/var/druid/segments 目錄下。

Segment目錄
segment通過datasource_beginTime_endTime_version_shard用於唯一標識,在實際儲存中是以目錄的形式表現的。

Segment目錄
可以看到Segment中包含了Segment描述檔案(descriptor.json)和壓縮後的索引資料檔案(index.zip),我們主要看的也是index.zip這個檔案,對其進行解壓縮。

Segemnt資料檔案
首先看下factory.json這個檔案,這個檔案並不是segment具體儲存段資料的檔案。因為Druid通過使用MMap(一種記憶體對映檔案的方式)的方式訪問Segment檔案,通過檢視這個檔案內容來看,貌似是用於MMap讀取檔案所使用的(不太瞭解MMap)?
#factory.json檔案內容 {"type":"mMapSegmentFactory"}
Druid實際儲存Segment資料檔案是:version.bin、meta.smoosh和xxxxx.smoosh這三個檔案,下面分別看下這三個檔案的內容。
version.bin是一個儲存了4個位元組的二進位制檔案,它是Segment內部版本號(隨著Druid發展,Segment的格式也在發展),目前是V9,以Sublime開啟該檔案可以看到:
meta.smoosh裡面儲存了關於其它smoosh檔案(xxxxx.smoosh)的元資料,裡面記錄了每一列對應檔案和在檔案的偏移量。除了列資訊外,smoosh檔案還包含了index.drd和metadata.drd,這部分是關於Segment的一些額外元資料資訊。
#版本號,該檔案所能儲存的最大值(2G),smooth檔案數 v1,2147483647,1 # 列名,檔名,起始偏移量,結束偏移量 __time,0,0,154 city,0,306,577 gmv,0,154,306 index.drd,0,841,956 metadata.drd,0,956,1175 sex,0,577,841
再看00000.smoosh檔案前,我們先想一下為什麼這個檔案被命名為這種樣式?因為Druid為了最小化減少開啟檔案的控制代碼數,它會將一個Segment的所有列資料都儲存在一個smoosh檔案中,也就是xxxxx.smoosh這個檔案。但是由於Druid使用MMap來讀取Segment檔案,而MMap需要保證每個檔案大小不能超過2G(Java中的MMapByteBuffer限制),所以當一個smoosh檔案大於2G時,Druid會將新資料寫入到下一個smoosh檔案中。這也就是為什麼這些檔案命名是這樣的,這裡也對應上了meta檔案中為什麼還要標識列所在的檔名。
通過meta.smoosh的偏移量也能看出,00000.smoosh檔案中資料是按列進行儲存的,從上到下分別儲存的是時間列、指標列、維度列。對於每列主要包會含兩部分資訊:ColumnDescriptor和binary資料。columnDescriptor是一個使用Jackson序列化的物件,它包含了該列的一些元資料資訊,比如資料型別、是否是多值等。而binary則是根據不同資料型別進行壓縮儲存的二進位制資料。
^@^@^@d{"valueType":"LONG","hasMultipleValues":false,"parts":[{"type":"long","byteOrder":"LITTLE_ENDIAN"}]}^B^@^@^@^C^@^@ ^@^A^A^@^@^@^@"^@^@^@^A^@^@^@^Z^@^@^@^@¢yL½Ìf^A^@^@<8c>X^H^@<80>¬^WÀÌf^A^@^@^@^@^@d{"valueType":"LONG","hasMultipleValues":false,"parts":[{"type":"long","byteOrder":"LITTLE_ENDIAN"}]}^B^@^@^@^C^@^@ ^@^A^A^@^@^@^@ ^@^@^@^A^@^@^@^X^@^@^@^@1 N^@^A^@"PÃ^H^@<80>^P'^@^@^@^@^@^@^@^@^@<9a>{"valueType":"STRING","hasMultipleValues":false,"parts":[{"type":"stringDictionary","bitmapSerdeFactory":{"type":"concise"},"byteOrder":"LITTLE_ENDIAN"}]}^B^@^@^@^@^A^A^@^@^@#^@^@^@^B^@^@^@^K^@^@^@^W^@^@^@^@beijing^@^@^@^@shanghai^B^A^@^@^@^C^@^A^@^@^A^A^@^@^@^@^P^@^@^@^A^@^@^@^H^@^@^@^@0^@^@^A^A^@^@^@^@^\^@^@^@^B^@^@^@^H^@^@^@^P^@^@^@^@<80>^@^@^C^@^@^@^@<80>^@^@^D^@^@^@<9a>{"valueType":"STRING","hasMultipleValues":false,"parts":[{"type":"stringDictionary","bitmapSerdeFactory":{"type":"concise"},"byteOrder":"LITTLE_ENDIAN"}]}^B^@^@^@^@^A^A^@^@^@^\^@^@^@^B^@^@^@^G^@^@^@^P^@^@^@^@man^@^@^@^@woman^B^A^@^@^@^C^@^A^@^@^A^A^@^@^@^@^P^@^@^@^A^@^@^@^H^@^@^@^@0^@^A^@^A^@^@^@^@^\^@^@^@^B^@^@^@^H^@^@^@^P^@^@^@^@<80>^@^@^E^@^@^@^@<80>^@^@^B^A^@^@^@^@&^@^@^@^C^@^@^@^G^@^@^@^O^@^@^@^V^@^@^@^@gmv^@^@^@^@city^@^@^@^@sex^A^A^@^@^@^[^@^@^@^B^@^@^@^H^@^@^@^O^@^@^@^@city^@^@^@^@sex^@^@^AfÌ<91>Ð^@^@^@^AfѸ,^@^@^@^@^R{"type":"concise"}{"container":{},"aggregators":[{"type":"longSum","name":"gmv","fieldName":"gmv","expression":null}],"timestampSpec":{"column":"time","format":"auto","missingValue":null},"queryGranularity":{"type":"none"},"rollup":true}
smooth檔案中的binary資料經過LZ4或Bitmap壓縮,所以無法看到資料原始內容。
在smooth檔案最後還包含了兩部分資料,分別是index.drd和metadata.drd。其中index.drd中包含了Segment中包含哪些度量、維度、時間範圍、以及使用哪種bitmap。metadata.drd中儲存了指標聚合函式、查詢粒度、時間戳配置等(上面內容的最後部分)。
下圖是物理儲存結構圖,儲存未壓縮和編碼的資料就是最右邊的內容。

Segment物理儲存
Segment建立
Segment都是在MiddleManager節點中建立的,並且處在MiddleManager中的Segment在狀態上都是可變的並且未提交的(提交到DeepStorage之後,資料就不可改變)。
Segment從在MiddleManager中建立到傳播到Historical中,會經歷以下幾個步驟:
- MiddleManager中建立Segment檔案,並將其釋出到Deep Storage。
- Segment相關的元資料資訊被儲存到MetaStore中。
- Coordinator程序根據MetaStore中得知Segment相關的元資料資訊後,根據規則的設定分配給複合條件的Historical節點。
- Historical節點得到Coordinator指令後,自動從DeepStorage中拉取Segment資料檔案,並通過Zookeeper向叢集宣告負責提供該Segment資料相關的查詢服務。
- MiddleManager在得知Historical負責該Segment後,會丟棄該Segment檔案,並向叢集宣告不在負責該Segment相關的查詢。
如何配置分割槽
可以通過granularitySpec中的segmentGranularity設定segment的時間間隔( http://druid.io/docs/latest/ingestion/ingestion-spec.html#granularityspec )。為了保證Druid的查詢效率,每個Segment檔案的大小建議在300MB~700MB之間。如果超過這個範圍,可以修改時間間隔或者使用分割槽來進行優化(配置partitioningSpec中的targetPartitionSize,官方建議設定500萬行以上; http://druid.io/docs/latest/ingestion/hadoop.html#partitioning-specification )。
系統架構詳解
我們知道Druid節點型別有五種:Overload、MiddleManager、Coordinator、Historical和Broker。

Druid架構
Overload和MiddleManager主要負責資料攝入(對於沒有釋出的Segment,MiddleManager也提供查詢服務);Coordinator和Historical主要負責歷史資料的查詢;Broker節點主要負責接收Client查詢請求,拆分子查詢給MiddleManager和Historical節點,然後合併查詢結果返回給Client。其中Overload是MiddleManager的master節點,Coordinator是Historical的master節點。
索引服務
Druid提供一組支援索引服務(Indexing Service)的元件,也就是Overload和MiddleManager節點。索引服務是一種高可用的分散式服務,用於執行跟索引相關的任務,索引服務是資料攝入建立和銷燬Segment的主要方式(還有一種是採用實時節點的方式,但是現在已經廢棄了)。索引服務支援以pull或push的方式攝入外部資料。
索引服務採用的是主從架構,Overload為主節點,MiddleManager是從節點。索引服務架構圖如下圖所示:

索引服務
索引服務由三部分元件組成:用於執行任務的Peon(勞工)元件、用於管理Peon的MiddleManager元件和分配任務給MiddleManager的Overload元件。MiddleManager和Overload元件可以部署在相同節點也可以跨節點部署,但是Peon和MiddleManager是部署在同一個節點上的。
索引服務架構和Yarn的架構很像:
- Overlaod節點相當於Yarn的ResourceManager,負責叢集資源管理和任務分配。
- MiddleManager節點相當於Yarn的NodeManager,負責接受任務和管理本節點的資源。
- Peon節點相當於Yarn的Container,執行節點上具體的任務。
Overload節點
Overload作為索引服務的主節點,對外負責接受索引任務,對內負責將任務分解並下發給MiddleManager。Overload有兩種執行模式:
- 本地模式(Local Mode):預設模式。本地模式下的Overload不僅負責任務協調工作,還會負責啟動一些peon來完成具體的任務。
- 遠端模式(Remote Mode):該模式下,Overload和MiddleManager執行在不同的節點上,它僅負責任務的協調工作,不負責完成具體的任務。
Overload提供了一個UI客戶端,可以用於檢視任務、執行任務和終止任務等。
http://<OVERLORD_IP>:<port>/console.html
Overload提供了RESETful的訪問形式,所以客戶端可以通過HTTP POST形式向請求節點提交任務。
http://<OVERLORD_IP>:<port>/druid/indexer/v1/task //提交任務 http://<OVERLORD_IP>:<port>/druid/indexer/v1/task/{task_id}/shutdown //殺死任務
MiddleManager節點
MiddleManager是執行任務的工作節點,MiddleManager會將任務單獨發給每個單獨JVM執行的Peon(因為要把資源和日誌進行隔離),每個Peon一次只能執行一個任務。
Peon節點
Peon在單個JVM中執行單個任務,MiddleManager負責為任務建立Peon。
Coordinator節點
Coordinator是Historical的mater節點,它主要負責管理和分發Segment。具體工作就是:告知Historical載入或刪除Segment、管理Segment副本以及負載Segment在Historical上的均衡。
Coordinator是定期執行的,並且執行間隔可以通過配置引數配置。每次Coordinator執行都會通過Zookeeper獲取當前叢集狀態,通過評估叢集狀態來採取適當的操作(比如均衡負載Segment)。Coordinator會連線資料庫(MetaStore),資料庫中儲存了Segment資訊和規則(Rule)。Segment表中列出了需要載入到叢集中的所有Segment,Coordinator每次執行都會從Segment表來拉取Segment列表並與當前叢集的Segment對比,如果發現數據庫中不存在的Segment,但是在叢集中還有,就會把它從叢集刪掉;規則表定義瞭如何處理Segment,規則的作用就是我們可以通過配置一組規則,來操作叢集載入Segment或刪除Segment。關於如何配置規則,可以檢視: http://druid.io/docs/latest/operations/rule-configuration.html 。
Historical節點載入Segment前,會進行容量排序,哪個Historical節點的Segment最少,則它就具有最高的載入權。Coordinator不會直接Historical節點通訊,而是將Segment資訊放到一個佇列中,Historical節點去佇列取Segment描述資訊,並且載入該Segment到本節點。
Coordinator提供了一UI介面,用於顯示叢集資訊和規則配置:
http://<COORDINATOR_IP>:<COORDINATOR_PORT>
Historical節點
Historical節點負責管理歷史Segment,Historical節點通過Zookeeper監聽指定的路徑來發現是否有新的Segment需要載入(Coordinator通過分配演算法指定具體的Historical)。
上面通過Coordinator知道,當有新的Segment需要載入的時候,Coordinator會將其放到一個佇列中。當Historical節點收到有新的Segment時候,就會檢測本地cache和磁碟,檢視是否有該Segment資訊。如果沒有Historical節點會從Zookeeper中拉取該Segment相關的資訊,然後進行下載。

Historical載入Segment
Broker
Broker節點是負責轉發Client查詢請求的,Broker通過zookeeper能夠知道哪個Segment在哪些節點上,Broker會將查詢轉發給相應節點。所有節點返回資料後,Broker會將所有節點的資料進行合併,然後返回給Client。
Broker會有一個LRU(快取記憶體失效策略),來快取每Segment的結果。這個快取可以是本地快取,也可以藉助外部快取系統(比如memcached),第三方快取可以在所有broker中共享Segment結果。當Borker接收到查詢請求後,會首先檢視本地是否有對應的查詢資料,對於不存在的Segment資料,會將請求轉發給Historical節點。

broker查詢
Broker不會快取實時資料,因為實時資料處於不可靠狀態。