1. 程式人生 > >分享一個CQRS/ES架構中基於寫檔案的EventStore的設計思路

分享一個CQRS/ES架構中基於寫檔案的EventStore的設計思路

什麼是EventStore

關於什麼是EventStore,如果還不清楚的朋友可以去了解下CQRS/Event Sourcing這種架構,我部落格中也有大量介紹。EventStore是在Event Sourcing(下面簡稱ES)模式中,用於儲存事件用的。從DDD的角度來說,每個聚合根在自己的狀態發生變化時都會產生一個或多個領域事件,我們需要把這些事件持久化起來。然後當我們需要恢復聚合根的最新狀態到記憶體時,可以通過ES這種技術,從EventStore獲取該聚合根的所有事件,然後重演這些事件,就能將該聚合根恢復到最新狀態了。這種技術和MySQL的Redo日誌以及Redis的AOF日誌或者leveldb的WAL日誌的原理是類似的。但是區別是,redo/AOF/WAL日誌是Command Sourcing,而我們這裡說的是Event Sourcing。關於這兩個概念的區別,我不多展開了,有興趣的朋友可以去了解下。

為什麼要寫一個EventStore

目前ENode使用的EventStore,是基於關係型資料庫SqlServer的。雖然功能上完全滿足要求,但是效能上和資料容量上,離我的預期還有一些距離。比如:

  1. 關於效能,雖然可以通過SqlBulkCopy方法,實現較大的寫入吞吐,但是我對EventStore的要求是,需要支援兩個唯一索引:1)聚合根ID+事件版本號唯一;2)聚合根ID+命令ID唯一;當新增這兩個唯一索引後,會很大影響SqlBulkCopy寫入資料的效能;而且SqlBulkCopy只有SqlServer才有,其他資料庫如MySQL沒有,這樣也無形之中限制了ENode的使用場景;
  2. 關於使用場景,DB是基於SQL的,他不是簡單的幫我們儲存資料,每次寫入資料都要解析SQL,執行SQL,寫入RedoLOG,等;另外,DB還要支援修改資料、通過SQL查詢資料等場景。所以,這就要求DB內部在設計儲存結構時,要兼顧各種場景。而我們現在要實現的EventStore,針對的場景比較簡單:1)追求高吞吐的寫入,沒有修改和刪除;2)查詢非常少,不需要支援複雜的關係型查詢,只需要能支援查詢某個聚合根的所有事件即可;所以,針對這種特定的使用場景,如果有針對性的實現一個EventStore,我相信效能上可以有更大的提升空間;
  3. 關於資料量,一個EventStore可能需要儲存大量的事件,百億或千億級別。如果採用DB,那我們只能進行分庫分表,因為單表能儲存的記錄數是有限的,比如1000W,超過這個數量,對寫入效能也會有一定的影響。假設我們現在要儲存100億事件記錄,單表儲存1000W,那就需要1000個表,如果單個物理庫中分100個表,那就需要10個物理庫;如果將來資料量再增加,則需要進一步擴容,那就需要牽涉到資料庫的資料遷移(全量同步、增量同步)這種麻煩的事情。而如果是基於檔案版本的EventStore,由於沒有表的概念了,所以單機只要硬碟夠大,就能儲存非常多的資料。並且,最重要的,效能不會因為資料量的增加而下降。當然,EventStore也同樣需要支援擴容,但是由於EventStore中的資料只會Append寫入,不會修改,也不會刪除,所以擴容方案相對於DB來說,要容易做很多。
  4. 那為何不使用NoSQL?NoSQL一般都是為大資料、可伸縮、高效能而設計的。因為通常NoSQL不支援上面第一點中所說的二級索引,當然一些文件型資料庫如MongoDB是支援的,但是對我來說是一個黑盒,我無法駕馭,也沒有使用經驗,所以沒有考慮。
  5. 從長遠來看,如果能夠自己根據自己的場景實現一個有針對性的EventStore,那未來如果出現效能瓶頸的問題,自己就有足夠的能力去解決。另外,對自己的技術能力的提高也是一個很大的鍛鍊機會。而且這個做好了,說不定又是自己的一個很好的作品,呵呵。所以,為何不嘗試一下呢?

EventStore的設計目標

  • 要求高效能順序寫入事件;
  • 要求嚴格判斷聚合根的事件是否按版本號順序遞增寫入;
  • 支援命令ID的唯一性判斷;
  • 支援大量事件的儲存;
  • 支援按照聚合根ID查詢該聚合根的所有事件;
  • 支援動態擴容;
  • 高可用(HA),需要支援叢集和主備,二期再做;

EventStore核心問題描述、問題分析、設計思路

核心問題描述

一個EventStore需要解決的核心問題有兩點:1)持久化事件;2)持久化事件之前判斷事件版本號是否合法、事件對應的命令是否重複。一個事件包含的資訊如下:

  • 聚合根ID
  • 事件版本號
  • 命令ID
  • 事件內容
  • 事件發生時間

為什麼是這些資訊?

本文所提到的事件是CQRS架構中,由C端的某個命令操作某個聚合根後,導致該聚合根的狀態發生變化,然後每次變化都會產生一個對應的事件。所以,針對聚合根的每個事件,我們關注的資訊就是:哪個命令操作哪個聚合根,產生了什麼版本號的一個事件,事件的內容和產生的時間分別是什麼。

事件的版本號是什麼意思?

由於一個聚合根在生命週期內經常會被修改,也就是說經常會有命令去修改聚合根的狀態,而每次狀態的變化都會產生一個對應的事件,也就是說一個聚合根在生命週期內會產生多個事件。聚合根是領域驅動設計(DDD)中的一個概念,聚合根是一個具有全域性唯一ID的實體,具有獨立的生命週期,是資料強一致性的最小邊界。為了保證聚合根內的資料的強一致性,針對單個聚合根的任何修改都必須是線性的,因為只有線性的操作,才能保證當前的操作所基於的聚合根的狀態是最新的,這樣才能保證聚合根內資料的完整性,總是滿足業務規則的不變性。關於線性操作這點,就像對DB的一張表中的某一條記錄的修改也必須是線性的一樣,資料庫中的同一條記錄不可能同時被兩個執行緒同時修改。所以,分析到這裡,我們知道同一個聚合根的多個事件的產生必定是有先後順序的。那如何保證這個先後順序呢?答案是,在聚合根上設計一個版本號,通過版本號的順序遞增來保證對同一個聚合根的修改也總是線性依次的。這個思路其實就是一種樂觀併發控制的思路。聚合根的第一個事件的版本號為1,第二個事件的版本號為2,第N個事件的版本號為N。當第N個事件產生時,它所基於的聚合根的狀態必須是N-1。當某個版本號為N的事件嘗試持久化到EventStore時,如果EventStore中已經存在了一個版本號為N的事件,則認為出現併發衝突,需要告訴上層應用當前事件持久化遇到併發衝突了,然後上層應用需要獲取該聚合根的最新狀態,然後再重試當前命令,然後再產生新的版本號的事件,再持久化到EventStore。

希望能自動檢測命令是否重複處理

CQRS架構,任何聚合根的修改都是通過命令來完成的。命令就是一個DTO,當我們要修改一個聚合根的狀態時,就傳送一個命令到分散式MQ即可,然後MQ的消費者處理該命令。但是大家都知道任何分散式MQ一般都只能做到至少投遞一次(At Least Once)的訊息投遞語義。也就是說,一個命令可能會被消費者重複處理。在有些情況下,某個聚合根如果重複處理某個命令,會導致聚合根的最終狀態不正確,比如重複扣款會導致賬號餘額不正確。所以,我們希望在框架層面能支援命令的重複處理的檢測。那最理想的檢測位置在哪裡呢?如果是傳統的DB,我們會在資料庫層面通過建立唯一索引保證命令絕對不會重複執行。那對應到我們的EventStore,自然也應該在EventStore內部檢測。

核心問題分析

通過上面的問題描述,我們知道,其實一個EventStore需要解決的問題就兩點:1)以檔案的形式持久化事件;2)持久化之前判斷事件的版本號是否衝突、事件的命令是否重複。

關於第一點,自然是通過順序寫檔案來實現,機械硬碟在順序寫檔案的情況下,效能也是非常高的。寫檔案的思路非常簡單,我們可以固定單個檔案的大小,比如512MB。然後先寫第一個檔案,寫滿後新建一個檔案,再寫第二個,後面以此類推。

關於第二點,本質上是兩個索引的需求:a. 聚合根ID+事件版本號唯一(當然,這裡不僅要保證唯一,還要判斷是否是連續遞增);b. 聚合根ID + 命令ID唯一,即針對同一個聚合根的命令不能重複處理;那如何實現這兩個索引的需求呢?第一個索引的實現成本相對較低,我們只需要在記憶體維護每個聚合根的當前版本號,然後當一個事件過來時,判斷事件的版本號是否是當前版本號的下一個版本號即可,如果不是,則認為版本號非法;第二個索引的事件成本比較高,我們必須維護每個聚合根的所有產生的事件對應的命令的ID,然後在某個事件過來時,判斷該事件對應的命令ID是否和已經產生的任何一個事件的命令ID重複,如果有,則認為出現重複。所以,歸根結底,當需要持久化某個聚合根的事件時,我們需要載入該聚合根的所有已產生的事件的版本號以及事件對應的命令ID到記憶體,然後在記憶體進行判斷,從而檢查當前事件是否滿足這兩個索引需求。

好了,上面是基本的也是最直接的解決問題的思路了。但是我們不難發現,要實現上面這兩個問題並不容易。因為:首先我們的機器的記憶體大小是有限的,也就是說,無法把所有的聚合根的事件的索引資訊都放在記憶體。那麼當某個聚合根的事件要持久化時,發現記憶體中並無這個聚合根的事件索引時,必然要從磁碟中載入該聚合根的事件索引。但問題是,我們的事件由於為了追求高效能的寫入到檔案,總是隻是簡單的Append追加到最後一個檔案的末尾。這樣必然導致某個聚合根的事件可能分散在多個檔案中,這樣就給我們查詢這個聚合根的所有相關事件帶來了極大的困難。那該如何權衡的去設計這兩個需求呢?

我覺得設計是一種權衡,我們總是應該根據我們的實際業務場景去有側重點的進行設計,優先解決主要問題,然後次要問題儘量去解決。就像leveldb在設計時,也是側重於寫入時非常簡單快速,而讀取時,可能會比較迂迴曲折。EventStore,是非常典型的高頻寫入但很少讀取的系統。但寫入時需要保證上述的兩個索引需求,所以,應該說這個寫入的要求比leveldb的寫入要求還要高一些。那我們該如何去權衡呢?

EventStore核心設計思路

  1. 在記憶體中維護每個聚合根的版本索引eventVersion,eventVersion中維護了當前聚合根的所有的版本、每個版本對應的cmdId,以及每個版本的事件在event檔案中的物理位置;當一個事件過來時,通過這個eventVersion來判斷version,cmdId是否合法(version必須是currentVersion+1,cmdId必須唯一);
  2. 當寫入一個事件時,只寫入一個檔案,event.file檔案;假設一個檔案的大小為512MB,一個事件的大小為1KB,則一個檔案大概儲存52W個事件;
  3. 一個event.file檔案寫滿後:
    • 完成當前event.file檔案,然後新建一個新的event.file檔案,接下來的事件寫入新的event.file檔案;
    • 啟動一個後臺執行緒,在記憶體中對當前完成的event.file檔案中的event按照聚合根ID和事件版本號進行排序;
    • 排序完成後,我們就知道了該檔案中的事件涉及到哪些聚合根,他們的順序,以及最大最小聚合根ID分別是什麼;
    • 新建一個和event.file檔案一樣大小的臨時檔案;
    • 在臨時檔案的header中記錄當前event.file已排序過;
    • 在臨時檔案的資料區域將排好序的事件順序寫入檔案;
    • 臨時檔案寫入完成後,將臨時檔案替換當前已完成的event.file檔案;
    • 為event.file檔案新建一個對應的事件索引檔案eventIndex.file;
    • 將event.file檔案中的最大和最小聚合根ID寫入到eventIndex.file索引檔案的header;每個event.file的最大最小的聚合根ID的關係,會在EventStore啟動時自動載入並快取到記憶體中,這樣可以方便我們快速知道某個聚合根在某個event.file中是否存在事件,因為直接在記憶體中判斷即可。這個快取我暫時命名為aggregateIdRangeCache吧,以便下面更方便的進一步說明如何使用它。
    • 將event.file檔案中的每個聚合根的每個事件的索引資訊寫入eventIndex.file檔案,事件索引資訊包括:聚合根ID+事件版本號+事件的命令ID+事件在event.file檔案中的物理位置這4個資訊;有了這些索引資訊,我們就可以只需要訪問事件索引檔案就能獲取某個聚合根的所有版本資訊(就是上面說的eventVersion)了;
    • 但僅僅在事件索引檔案中記錄最大最小聚合根ID以及每個事件的索引資訊還不是不夠的。原因是,當我們要查詢某個聚合根的所有版本資訊時,雖然可以先根據記憶體中快取的每個event.file檔案的最大最小聚合根ID快速定位該聚合根在哪些event.file中存在事件(也就是明確了在哪些對應的事件索引檔案中存在版本資訊),但是當我們要從這些事件索引檔案中找出該聚合根的事件索引到底具體在檔案的哪個位置時,只能從檔案的起始位置順序掃描檔案才能知道,這樣的順序掃描無疑是不高效的。假設一個event.file檔案的大小固定為512MB,一個事件的大小為1KB,則一個event.file檔案大概儲存52W個事件,每個事件索引的大小大概為:24 + 4 + 24 + 8 = 60個位元組。所以,這52W個事件的索引資訊大概佔用30MB,也就是最終一個事件索引檔案的大小大概為30MB多一點。當我們要獲取某個聚合根的所有版本資訊時,如果每次訪問某個事件索引檔案時,總是要順序掃描30MB的檔案資料,那無疑效率不高。所以,我還需要進一步想辦法優化,因為事件索引檔案裡的事件索引資訊都是按照聚合根ID和事件版本號排序的,假設現在有52W個事件索引,則我們可以將這52W個事件索引記錄均等切分為100個點,然後把每個點對應的事件索引的聚合根ID都記錄到事件索引檔案的header中,一個聚合根ID的長度為24個位元組,則100個也就2.4KB左右。這樣一來,當我們想要知道某個聚合根的事件索引大概在事件索引檔案的哪個位置時,我們可以先通過訪問header裡的資訊,快速知道應該從哪個位置去掃描。這樣一來,本來對於一個事件索引檔案我們要掃描30MB的資料,現在變為只需要掃描百分之一的資料,即300KB,這樣掃描的速度就快很多了。這一段寫的有點囉嗦,但一切都是為了儘量詳細的描述我的設計思路,不知道各位看官是否看懂了。
    • 除了記錄記錄最大最小聚合根ID以及記錄100個等分的切割點外,還有一點可以優化來提高獲取聚合根的版本資訊的效能,就是:如果記憶體足夠,當某個eventIndex.file被讀取一次後,EventStore可以自動將這個eventIndex.file檔案快取到非託管記憶體中;這樣下次就可以直接在非託管記憶體訪問這個eventIndex.file了,減少了磁碟IO的讀取;
  4. 因為記憶體大小有限,所以eventVersion不可能全部快取在記憶體;所以,當某個聚合根的eventVersion不在記憶體中時,需要從磁碟載入。載入的思路是:掃描aggregateIdRangeCache,快速找出該聚合根的事件在哪些event.file檔案中存在;然後通過上面提到的查詢演算法快速查詢這些event.file檔案對應的eventIndex.file檔案,這樣就能快速獲取該聚合根的eventVersion資訊了;
  5. 另外,EventStore啟動時,最好需要預載入一些熱門聚合根的eventVersion資訊到快取。那該預載入哪些聚合根呢?我們可以在記憶體中維護一個固定大小(N)的環形陣列,環形陣列中維護了最近修改的聚合根的ID;當某個聚合根有事件產生,則將該聚合根ID的hashcode取摸N得到環形陣列的下標,然後將該聚合根ID放入該下標;定時將該環形陣列中的聚合根ID dump到檔案preloadAggregateId.file進行儲存;這樣當EventStore啟動時,就可以從preloadAggregateId.file載入指定聚合根的eventVersion;

思路總結:

上面的設計的主要思路是:

  • 寫入一個事件前先記憶體中判斷是否允許寫入,如果允許,則順序寫入event.file檔案;
  • 對一個已經寫入完成的event.file檔案,則用一個後臺非同步執行緒對檔案中的事件按照聚合根ID和事件版本號進行排序,然後將排序後的臨時event.file檔案替換原event.file檔案,同時將排序後得到的事件索引資訊寫入eventIndex.file檔案;
  • 寫入一個事件時,如果當前聚合根的版本資訊不在記憶體,則需要從相關的eventIndex.file檔案載入到記憶體;
  • 由於載入版本資訊可能需要訪問多個eventIndex.file檔案,會有多次讀磁碟的IO,對效能影響較大,所以,我們總是應該儘量在記憶體快取聚合根的版本資訊;
  • 整個EventStore的效能瓶頸在於記憶體中能快取多少聚合根版本資訊,如果能夠快取百分百的聚合根版本資訊,且能做到沒有GC的問題(儘量避免),那我們就可以做到寫入事件非常快速;所以,如何設計一個支援大容量快取(比如快取幾十個GB的資料),且沒有GC問題的高效能快取服務,就變得很關鍵了;
  • 由於有了事件索引資訊以及這麼多的快取機制,所以,當要查詢某個聚合根的所有事件,也就非常簡單了;

如何解決多執行緒併發寫的時候的CPU佔用高的問題?

到這裡,我們分析瞭如何儲存資料,如何寫入資料,還有如何查詢聚合根的所有事件,應該說核心功能的實現思路已經想好了。如果現在是單執行緒訪問EventStore,我相信效能應該不會很低了。但是,實際的情況是N多客戶端會同時併發的訪問EventStore。這個時候就會導致EventStore伺服器會有很多執行緒要求同時寫入事件到資料檔案,但是大家知道寫檔案必須是單執行緒的,如果是多執行緒,那也要用鎖的機制,保證同一個時刻只能有一個執行緒在寫檔案。最簡單的辦法就是寫檔案時用一個lock搞定。但是經過測試發現簡單的使用lock,在多執行緒的情況下,會導致CPU很高。因為每個執行緒在處理當前事件時,由於要寫檔案或讀檔案,都是IO操作,所以鎖的佔用時間比較長,導致很多執行緒都在阻塞等待。

為了解決這個問題,我做了一些調研,最後決定使用雙緩衝佇列的技術來解決。大致思路是:

設計兩個佇列,將要寫入的事件先放入佇列1,然後當前要真正處理的事件放在佇列2。這樣就做到了把接收資料和處理資料這兩個過程在物理上分離,先快速接收資料並放在佇列1,然後處理時把佇列1裡的資料放入佇列2,然後佇列2裡的資料單執行緒線性處理。這裡的一個關鍵問題是,如何把佇列1裡的資料傳給佇列2呢?是一個個拷貝嗎?不是。這種做法太低效。更好的辦法是用交換兩個佇列的引用的方式。具體思路這裡我不展開了,大家可以網上找一下雙緩衝佇列的概念。這個設計我覺得最大的好處是,可以有效的降低多執行緒寫入資料時對鎖的佔用時間,本來一次鎖佔用後要直接處理當前事件的,而現在只需要把事件放入佇列即可。雙緩衝佇列可以在很多場景下被使用,我認為,只要是多個訊息生產者併發產生訊息,然後單個消費者單執行緒消費訊息的場景,都可以使用。而且這個設計還有一個好處,就是我們可以有機會單執行緒批量處理佇列2裡的資料,進一步提高處理資料的吞吐能力。

如何快取大量事件索引資訊?

最簡單的辦法是使用支援併發訪問的字典,如ConcurrentDictionary<T,K>,Java中就是ConcurrentHashmap。但是經過測試發現ConcurrentDictionary在key增加到3000多萬的時候就會非常慢,所以我自己實現了一個簡單的快取服務,初步測試下來,基本滿足要求。具體的設計思路本文先不介紹了,總之我們希望實現一個程序內的,支援快取大量key/value的一個字典,支援併發操作,不要因為記憶體佔用越多而導致快取能力的下降,儘量不要有GC的問題,能滿足這些需求就OK。

如何擴容?

我們再來看一下最後一個我認為比較重要的問題,就是如何擴容。

雖然我們單臺EventStore機器只要硬碟夠大,就可以儲存相當多的事件。但是硬碟再大也有上限,所以擴容的需求總是有的。所以如何擴容(將資料遷移到其他伺服器上)呢?通過上面的設計我們瞭解到,EventStore中最核心的檔案就是event.file,其餘檔案都可以通過event.file檔案來生成。所以,我們擴容時只需要遷移event.file檔案即可。

那如何擴容呢?假設現在有4臺EventStore機器,要擴容到8臺。

有兩個辦法:

  1. 土豪的做法:準備8臺全新的機器,然後把原來4臺機器的全部資料分散到新準備的8臺機器上,然後再把老機器上的資料全部刪除;
  2. 屌絲的做法:準備4臺全新的機器,然後把原來4臺機器的一半資料分散到新準備的4臺機器上,然後再把老機器上的那一半資料刪除;

對比之下,可以很容易發現土豪的做法比較簡單,因為只需要考慮如何遷移資料到新機器即可,不需要考慮遷移後把已經遷移過去的資料還要刪除。大體的思路是:

  1. 採用拉的方式,新的8臺目標機器都在向老的4臺源機器拖事件資料;目標機器記錄當前拖到哪裡了,以便如果遇到意外中斷停止後,下次重啟能繼續從該位置繼續拖;
  2. 每臺源機器都掃描所有的事件資料檔案,一個個事件進行掃描,掃描的起始位置由當前要拖資料的目標機器給出;
  3. 每臺目標機器該拖哪些事件資料?預先在源機器上配置好這次擴容的目標機器的所有唯一標識,如IP;然後當某一臺目標機器過來拖資料時,告知自己的機器的IP。然後源機器根據IP就能知道該目標機器在所有目標機器中排第幾,然後源機器就能知道應該把哪些事件資料同步給該目標機器了。舉個例子:假設當前目標機器的IP在所有IP中排名第3,則針對每個事件,獲取事件的聚合根ID,然後將聚合根ID hashcode取摸8,如果餘數為3,則認為該事件需要同步給該目標機器,否則就跳過該事件;通過這樣的思路,我們可以保證同一個聚合根的所有事件都最終同步到了同一臺新的目標機器。只要我們的聚合根ID夠均勻,那最終一定是均勻的把所有聚合根的事件均勻的同步到目標機器上。
  4. 當目標機器上同步完整了一個event.file後,就自動非同步生成其對應的eventIndex.file檔案;

擴容過程的資料同步遷移的思路差不多了。但是擴容過程不僅僅只有資料遷移,還有客戶端路由切換等。那如客戶端何動態切換路由資訊呢?或者說如何做到不停機動態擴容呢?呵呵。這個其實是一個外圍的技術。只要資料遷移的速度跟得上資料寫入的速度,然後再配合動態推送新的路由配置資訊到所有的客戶端。最終就能實現動態庫容了。這個問題我這裡先不深入了,搞過資料庫動態擴容的朋友應該都瞭解原理。無非就是一個全量資料遷移、增量資料遷移、資料校驗、短暫停止寫服務,切換路由配置資訊這幾個關鍵的步驟。我上面介紹的是最核心的資料遷移的思路。

結束語

本文介紹了我之前一直想做的一個基於檔案版本的EventStore的關鍵設計思路,希望通過這篇文章把自己的思路系統地整理出來。一方面通過寫文章可以進一步確信自己的思路是否OK,因為如果你文章寫不出來,其實思路一定是哪裡有問題,寫文章的過程就是大腦整理思緒的過程。所以,寫文章也是檢查自己設計的一種好方法。另一方面,也可以通過自己的原創分享,希望和大家交流,希望大家能給我一些意見或建議。這樣也許可以在我動手寫程式碼前能及時糾正一些設計上的錯誤。最後再補充一點,語言不重要,重要的是架構設計、資料結構,以及演算法。誰說C#語言做不出好東西呢?呵呵。