1. 程式人生 > >CQRS之旅——旅程6(我們系統的版本管理)

CQRS之旅——旅程6(我們系統的版本管理)

旅程6:我們系統的版本管理

準備下一站:升級和遷移

“變化是生活的調味品。”威廉·考珀

此階段的最高目標是瞭解如何升級包含實現CQRS模式和事件源的限界上下文的系統。團隊在這一階段實現的使用者場景包括對程式碼的更改和對資料的更改:更改了一些現有的資料模式並添加了新的資料模式。除了升級系統和遷移資料外,團隊還計劃在沒有停機時間的情況下進行升級和遷移,以便在Microsoft Azure中執行實時系統。

本章的工作術語定義:

本章使用了一些術語,我們將在下面進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的“深入CQRS和ES”。

  • Command(命令):命令是要求系統執行更改系統狀態的操作。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自使用者發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操作時)。單個接收方處理一個命令。命令匯流排(command bus)傳輸命令,然後命令處理程式將這些命令傳送到聚合。傳送命令是一個沒有返回值的非同步操作。

事件(Event):一個事件,比如OrderConfirmed,描述了系統中發生的一些事情,通常是一個命令的結果。領域模型中的聚合引發事件。事件也可以來自其他限界上下文。多個訂閱者可以處理特定的事件。聚合將事件釋出到事件匯流排。處理程式在事件總線上註冊特定型別的事件,然後將事件傳遞給訂閱伺服器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。

冪等性(Idempotency):冪等性是一個操作的特性,這意味著該操作可以多次應用而不改變結果。例如,“將x的值設定為10”的操作是冪等的,而“將x的值加1”的操作不是冪等的。在訊息傳遞環境中,如果訊息可以多次傳遞而不改變結果,則訊息是冪等的:這可能是因為訊息本身的性質,也可能是因為系統處理訊息的方式。

使用者故事:

在這個過程的這個階段,團隊實現了下面描述的使用者故事。

不停機升級

V2版本的目標是升級系統,包括任何必要的資料遷移,而不需要把系統停機。如果這在當前實現中不可行,那麼停機時間應該最小化,並且應該修改系統,以便在將來支援零停機時間升級(從V3版本開始)。

Beth(業務經理)發言:

確保我們能夠在不停機的情況下進行升級,這對我們在市場中的信譽至關重要。

顯示剩餘座位數量

目前,當註冊者建立一個訂單時,沒有顯示每種座位型別的剩餘座位數量。當註冊者選擇購買座位時,UI應該顯示此資訊。

處理不需要付費的座位

目前,當註冊者選擇不需要付費的座位時,UI流仍然會將註冊者帶到支付頁面,即使不需要支付任何費用。系統應該檢測什麼時候沒有支付,並調整流程,讓註冊者直接進入訂單的確認頁面。

架構

該應用程式旨在部署到Microsoft Azure。在旅程的那個階段,應用程式由兩個角色組成,一個包含ASP.Net MVC Web應用程式的web角色和一個包含訊息處理程式和領域物件的工作角色。應用程式在寫端和讀端都使用Azure SQL DataBase例項進行資料儲存。應用程式使用Azure服務匯流排來提供其訊息傳遞基礎設施。下圖展示了這個高階體系結構。

在研究和測試解決方案時,可以在本地執行它,可以使用Azure compute emulator,也可以直接執行MVC web應用程式,並執行承載訊息處理程式和領域域物件的控制檯應用程式。在本地執行應用程式時,可以使用本地SQL Server Express資料庫,並使用一個在SQL Server Express資料庫實現的簡單的訊息傳遞基礎設施。

有關執行應用程式的選項的更多資訊,請參見附錄1“釋出說明”。

模式和概念

在旅程的這個階段,團隊處理的大多數關鍵挑戰都與如何最好地執行從V1到V2的遷移有關。本節將介紹其中的一些挑戰。

處理“事件定義發生更改"的情況

當團隊檢查V2的釋出需求,很明顯,我們需要改變在訂單和註冊限界上下文中使用的一些事件來適應一些新特性:RegistrationProcessManager將會改變,當訂單有一個不需要付費的座位時系統將提供一個更好的使用者體驗。

訂單和註冊限界上下文使用事件源,因此在遷移到V2之後,事件儲存將包含舊事件,但將開始儲存新事件。當系統事件被重放時,系統必須能正確處理所有的舊事件和新事件。

團隊考慮了兩種方法來處理系統中的這類更改。

在基礎設施中進行事件對映或過濾

在基礎設施中對映和過濾事件訊息是一種選擇。此方法是對舊的事件訊息和訊息格式進行處理,在它們到達領域之前在基礎設施的某個位置處理它們。您可以過濾掉不再相關的舊訊息,並使用對映將舊格式的訊息轉換為新格式。這種方法最初比較複雜,因為它需要對基礎設施進行更改,但是它可以保持領域域的純粹,領域只需要理解當前的新事件集合就可以了。

在聚合中處理多個版本的訊息

在聚合中處理多個版本的訊息是另一種選擇。在這種方法中,所有訊息型別(包括舊訊息和新訊息)都傳遞到領域,每個聚合必須能夠處理舊訊息和新訊息。從短期來看,這可能是一個合適的策略,但它最終會導致域模型受到遺留事件處理程式的汙染。

團隊為V2版本選擇了這個選項,因為它包含了最少數量的程式碼更改。

Jana(軟體架構師)發言:

當前在聚合中處理舊事件和新事件並不妨礙您以後使用第一種選擇:在基礎設施中使用對映/過濾機制。

履行訊息冪等性

V2版本中要解決的一個關鍵問題是使系統更加健壯。在V1版本中,在某些場景中,可能會多次處理某些訊息,導致系統中的資料不正確或不一致。

Jana(軟體架構師)發言:

訊息冪等性在任何使用訊息傳遞的系統中都很重要,這不僅僅是在實現CQRS模式或使用事件源的系統中。

在某些場景中,設計冪等訊息是可能的,例如:使用“將座位配額設定為500”的訊息,而不是“在座位配額中增加100”的訊息。您可以安全地多次處理第一個訊息,但不能處理第二個訊息。

然而,並不總是能夠使用冪等訊息,因此團隊決定使用Azure服務匯流排的重複刪除特性,以確保它只傳遞一次訊息。團隊對基礎設施進行了一些更改,以確保Azure服務匯流排能夠檢測重複訊息,並配置Azure服務匯流排來執行重複訊息檢測。

要了解Contoso是如何實現這一點的,請參閱下面的“不讓命令訊息重複”一節。此外,我們需要考慮系統中的訊息處理程式如何從佇列和Topic檢索訊息。當前的方法使用Azure服務匯流排peek/lock機制。這是一個分成三個階段的過程:

  1. 處理程式從佇列或Topic檢索訊息,並在其中留下訊息的鎖定副本。其他客戶端無法看到或訪問鎖定的訊息。
  2. 處理程式處理訊息。
  3. 處理程式從佇列中刪除鎖定的訊息。如果鎖定的訊息在固定時間後沒有解鎖或刪除,則解鎖該訊息並使其可用,以便再次檢索。

如果步驟由於某種原因失敗,這意味著系統可以不止一次地處理訊息。

Jana(軟體架構師)發言:

該團隊計劃在旅程的下一階段解決這個問題(步驟失敗的問題)。更多資訊,請參見第7章“新增彈性和優化效能”。

阻止多次處理事件

在V1中,在某些場景裡,如果在處理事件時發生錯誤,系統可能多次處理事件。為了避免這種情況,團隊修改了體系結構,以便每個事件處理程式都有自己對Azure Topic的訂閱。下圖顯示了兩個不同的模型。

在V1中,可能發生以下行為:

  1. EventProcessor例項從服務匯流排中的所有訂閱者那裡接收到OrderPlaced事件。
  2. EventProcessor例項有兩個已註冊的處理程式,RegistrationProcessManagerRouter和OrderViewModelGenerator處理程式類,所以會在兩個裡都觸發呼叫Handle方法。
  3. 在OrderViewModelGenerator類中的Handle方法執行成功。
  4. 在RegistrationProcessManagerRouter類中的Handle方法丟擲異常。
  5. EventProcessor例項捕獲到異常然後拋棄掉事件訊息。訊息將自動放回訂閱中。
  6. EventProcessor例項第二次從所有訂閱者那裡接收到OrderPlaced事件。
  7. 事件又觸發兩個處理方法,導致RegistrationProcessManagerRouter類和OrderViewModelGenerator第二次處理事件訊息。
  8. 每當RegistrationProcessManagerRouter類丟擲異常時,OrderViewModelGenerator類都會觸發處理該事件。

在V2模型中,如果處理程式類丟擲異常,EventProcessor例項將事件訊息放回與該處理程式類關聯的訂閱。重試邏輯現在只會導致EventProcessor例項重試引發異常的處理程式,因此沒有其他處理程式會重新處理訊息。

整合事件的持久化

在V1版本中提出的一個問題是,系統如何持久化從會議管理限界上下文傳送到訂單和註冊限界上下文的整合事件。這些事件包括關於會議建立和釋出的資訊,以及座位型別和配額更改的詳細資訊。

在V1版本中,訂單和註冊上下文中的ConferenceViewModelGenerator類通過更新檢視模型並向SeatsAvailability聚合傳送命令來處理這些事件,以告訴它更改座位配額值。

這種方法意味著訂單和註冊限界上下文不儲存任何歷史記錄,這可能會導致問題。例如,其他檢視從這裡中查詢座椅型別描述時,這裡只包含座椅型別描述的最新值。因此,在其他地方重播一組事件可能會重新生成另一個包含不正確座椅型別描述的讀取模型投影。

團隊考慮了以下五個方法來糾正這種情況:

  • 將所有事件儲存在原始限界上下文中(會議管理限界上下文中),並使用共享的事件儲存,訂單和註冊限界上下文中可以訪問該儲存來重播這些事件。接收限界上下文可以重放事件流,直到它需要檢視的之前的座椅型別描述時為止。
  • 當所有事件到達接收限界上下文(訂單和註冊限界上下文)時儲存它們。
  • 讓檢視模型生成器中的命令處理程式儲存事件,只選擇它需要的那些。
  • 讓檢視模型生成器中的命令處理程式儲存不同的事件,實際上就是為此檢視模型使用事件源。
  • 將來自所有限界上下文的所有命令和事件訊息儲存在訊息日誌中。

第一種選擇並不總是可行的。在這種特殊情況下,它可以工作,因為同一個團隊同時實現了限界上下文和基礎設施,使得使用共享事件儲存變得很容易。

Gary(CQRS專家)發言:

儘管從純粹主義者的角度來看,第一個選項破壞了限界上下文之間的嚴格隔離,但在某些場景中,它可能是一個可接受的實用解決方案。

第三種選擇可能存在的風險是,所需的事件集合可能在未來發生變化。如果我們現在不儲存事件,它們將永遠丟失。

儘管第五個選項儲存了所有命令和事件,其中一些可能永遠都不需要再次引用,但它確實提供了一個完整的日誌,記錄了系統中發生的所有事情。這對於故障診斷很有用,還可以幫助您滿足尚未確定的需求。該團隊選擇了這個選項而不是選項二,因為它提供了一個更通用的機制,可能具有未來的好處。

持久化事件的目的是,當訂單和註冊上下文需要有關當前座位配額的資訊時,可以回放這些事件,以便計算剩餘座位的數量。要一致地計算這些數字,必須始終以相同的順序回放事件。這種順序有幾種選擇:

  • 會議管理限界上下文傳送事件的順序。
  • 訂單和註冊上下文接收事件的順序。
  • 訂單和註冊上下文處理事件的順序。

大多數情況下,這些順序是相同的。沒有什麼正確的順序。你只需要選擇一個和它保持一致就行了。因此,選擇由簡單性決定。在本例中,最簡單的方法是按照訂單和註冊限界上下文中處理程式接收事件的順序持久化事件(第二個選項)。

Markus(軟體開發人員)發言:

這種選擇通常不會出現在事件源中。每個聚合會都以固定的順序建立事件,這就是系統用於持久儲存事件的順序。在此場景中,整合事件不是由單個聚合建立的。

為這些事件儲存時間戳也有類似的問題。如果將來需要檢視特定時間剩餘的座位數量,那麼時間戳可能會很有用。這裡的選擇是,當事件在會議管理限界上下文中建立時,還是在訂單和註冊限界上下文中接收時,應該建立時間戳?當會議管理限界上下文建立事件時,訂單和註冊限界上下文可能由於某種原因離線。因此,團隊決定在會議管理有界上下文釋出事件時建立時間戳。

訊息排序

團隊建立並執行來驗證V1版本的驗收測試,凸顯出了訊息排序的一個潛在問題:執行會議管理限界上下文的驗收測試向訂單和註冊限界上下文傳送了一系列命令,這些命令有時會出現順序錯誤。

Markus(軟體開發人員)發言:

當人類使用者真實測試系統的這一部分時,不太會注意到這種效果,因為發出命令的時間間隔要長得多,這使得訊息不太可能無序地到達。

團隊考慮了兩種方法來確保訊息以正確的順序到達。

  • 第一個方法是使用訊息會話,這是Azure服務匯流排的一個特性。如果您使用訊息會話,這將確保會話內的訊息以與它們傳送時相同的順序傳遞。
  • 第二種方法是修改應用程式中的處理程式,通過使用傳送訊息時新增到訊息中的序列號或時間戳來檢測無序訊息。如果接收處理程式檢測到一條無序訊息,它將拒絕該訊息,並在處理了在被拒絕訊息之前傳送的訊息之後,將其放回稍後處理的佇列或Topic。

在這種情況下,首選的解決方案是使用Azure服務匯流排訊息會話,因為這隻需要對現有程式碼進行更少的更改。這兩種方法都會給訊息傳遞帶來一些額外的延遲,但是團隊並不認為這會對系統的效能產生顯著的影響。

實現細節

本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份程式碼拷貝很有用,這樣您就可以繼續學習了。您可以從Download center下載一個副本,或者在GitHub上檢視儲存庫:https://github.com/mspnp/cqrs-journey-code。您可以從GitHub上的Tags頁面下載V2版本的程式碼。

備註:不要期望程式碼示例與參考實現中的程式碼完全匹配。本章描述了CQRS過程中的一個步驟,隨著我們瞭解更多並重構程式碼,實現可能會發生變化。

**新增對“不需要支付的訂單”的支援

做出這一改變有三個具體的目標,它們都是相關的。我們希望:

  • 修改RegistrationProcessManager類和相關聚合,以處理不需要支付的訂單。
  • 修改UI中的導航,當訂單不需要支付時跳過付款步驟。
  • 確保系統在升級到V2之後能夠正確地工作,包括使用新事件和舊事件。

RegistrationProcessManager類的更改

在此之前,RegistrationProcessManager類在收到來自UI的註冊者已完成支付的通知後傳送了一個ConfirmOrderPayment命令。現在,如果有一個不需要支付訂單,UI將直接向訂單聚合傳送一個ConfirmOrder命令。如果訂單需要支付,RegistrationProcessManager類在從UI接收到成功支付的通知後,再向訂單聚合傳送一個ConfirmOrder命令。

Jana(軟體架構師)發言:

注意,命令的名稱已從ConfirmOrderPayment更改為ConfirmOrder。這反映了訂單不需要知道任何關於付款的資訊。它只需要知道訂單已經確認。類似地,現在有一個新的OrderConfirmed事件用於替代舊的OrderPaymentConfirmed事件。

當訂單聚合接收到ConfirmOrder命令時,它將引發一個OrderConfirmed事件。除被持久化外,該事件還由以下物件處理:

  • OrderViewModelGenerator類,它在其中更新讀取模型中的訂單狀態。
  • SeatAssignments聚合,在其中初始化一個新的SeatAssignments例項。
  • RegistrationProcessManager類,它在其中觸發一個提交座位預訂的命令。

UI的更改

UI中的主要更改是在RegistrationController MVC控制器類中的SpecifyRegistrantAndPaymentDetails action裡的。之前,此action方法返回InitiateRegistrationWithThirdPartyProcessorPayment(action result)。現在,如果Order物件的新IsFreeOfCharge屬性為true,它將返回一個CompleteRegistrationWithoutPayment(action result)。否則,它返回一個CompleteRegistrationWithThirdPartyProcessorPayment(action result)。

[HttpPost]
public ActionResult SpecifyRegistrantAndPaymentDetails(AssignRegistrantDetails command, string paymentType, int orderVersion)
{
    ...

    var pricedOrder = this.orderDao.FindPricedOrder(orderId);
    if (pricedOrder.IsFreeOfCharge)
    {
        return CompleteRegistrationWithoutPayment(command, orderId);
    }

    switch (paymentType)
    {
        case ThirdPartyProcessorPayment:

            return CompleteRegistrationWithThirdPartyProcessorPayment(command, pricedOrder, orderVersion);

        case InvoicePayment:
            break;

        default:
            break;
    }

    ...
}

CompleteRegistrationWithThirdPartyProcessorPayment將使用者重定向到ThirdPartyProcessorPayment action,CompleteRegistrationWithoutPayment方法將使用者直接重定向到ThankYou action。

資料遷移

會議管理限界上下文在其Azure SQL資料庫例項中的PricedOrders表中儲存來自訂單和註冊限界上下文的訂單資訊。以前,會議管理限界上下文接收OrderPaymentConfirmed事件,現在它接收OrderConfirmed事件,該事件包含一個附加的IsFreeOfCharge屬性。這將成為資料庫中的一個新列。

Markus(軟體開發人員)發言:

在遷移過程中,我們不需要修改該表中的現有資料,因為布林值的預設值為false。所有現有條目都是在系統支援不需要付費的訂單之前建立的。

在遷移過程中,任何正在執行的ConfirmOrderPayment命令都可能丟失,因為它們不再由訂單聚合處理。您應該驗證當前的命令匯流排沒有這些命令。

Poe(IT運維人員)發言:

我們需要仔細計劃如何部署V2版本,以便確保所有現有的、正在執行的ConfirmOrderPayment命令都由執行V1版本的工作角色例項處理。

系統將RegistrationProcessManager類例項的狀態儲存到SQL資料庫表中。這個表的架構沒有變化。遷移後您將看到的惟一更改是StateValue列中的一個新新增值。這反映了RegistrationProcessManager類中的ProcessState列舉中額外的PaymentConfirmationReceived值,如下面的程式碼示例所示:

public enum ProcessState
{
    NotStarted = 0,
    AwaitingReservationConfirmation = 1,
    ReservationConfirmationReceived = 2,
    PaymentConfirmationReceived = 3,
}

在V1版本中,事件源系統為訂單聚合儲存的事件包括OrderPaymentConfirmed事件。因此,事件儲存區包含此事件型別的例項。在V2版本中,OrderPaymentConfirmed事件被替換為OrderConfirmed事件。

團隊決定在V2版本中,當反序列化事件時,不在基礎設施級別對映和過濾事件。這意味著,當系統從事件儲存中重播這些事件時,處理程式必須同時理解舊事件和新事件。下面的程式碼示例在SeatAssignmentsHandler類中顯示了這一點:

static SeatAssignmentsHandler()
{
    Mapper.CreateMap<OrderPaymentConfirmed, OrderConfirmed>();
}

public SeatAssignmentsHandler(IEventSourcedRepository<Order> ordersRepo, IEventSourcedRepository<SeatAssignments> assignmentsRepo)
{
    this.ordersRepo = ordersRepo;
    this.assignmentsRepo = assignmentsRepo;
}

public void Handle(OrderPaymentConfirmed @event)
{
    this.Handle(Mapper.Map<OrderConfirmed>(@event));
}

public void Handle(OrderConfirmed @event)
{
    var order = this.ordersRepo.Get(@event.SourceId);
    var assignments = order.CreateSeatAssignments();
    assignmentsRepo.Save(assignments);
}

您還可以在OrderViewModelGenerator類中看到同樣的技術。

Order類中的方法略有不同,因為這是持久化到事件儲存中的事件之一。下面的程式碼示例顯示了Order類中受保護建構函式的一部分:

protected Order(Guid id)
    : base(id)
{
    ...
    base.Handles<OrderPaymentConfirmed>(e => this.OnOrderConfirmed(Mapper.Map<OrderConfirmed>(e)));
    base.Handles<OrderConfirmed>(this.OnOrderConfirmed);
    ...
}

Jana(軟體架構師)發言:

以這種方式處理舊事件對於這個場景非常簡單,因為惟一需要更改的是事件的名稱。如果事件的屬性也發生了變化,情況會更加複雜。將來,Contoso將考慮在基礎設施中進行對映,以避免遺留事件汙染領域模型。

在UI中顯示剩餘座位

做出這一改變有三個具體的目標,它們都是相關的。我們想要:

  • 修改系統,在會議系統的讀模型中包含每個座位型別的剩餘座位數量資訊。
  • 修改UI以顯示每種座位型別的剩餘座位數量。
  • 確保升級到V2後系統功能正常。

向讀模型新增關於剩餘座位數量的資訊

系統要能顯示剩餘座位數量的資訊來自兩個地方:

  • 當業務客戶建立新的座位型別或修改座位配額時,會議管理限界上下文將引發SeatCreated和SeatUpdated事件。
  • 在訂單和註冊限界上下文中,當註冊者建立一個訂單的時候,可用座位(SeatsAvailability)聚合將引發SeatsReserved、SeatsReservationCancelled和AvailableSeatsChanged事件。

備註:ConferenceViewModelGenerator類不使用SeatCreated和SeatUpdated事件。

訂單和註冊限界上下文中的ConferenceViewModelGenerator類現在處理這些事件,並使用它們來計算和儲存讀模型中的座位型別數量。下面的程式碼示例顯示了ConferenceViewModelGenerator類中的相關處理程式:

public void Handle(AvailableSeatsChanged @event)
{
    this.UpdateAvailableQuantity(@event, @event.Seats);
}

public void Handle(SeatsReserved @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

public void Handle(SeatsReservationCancelled @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

private void UpdateAvailableQuantity(IVersionedEvent @event, IEnumerable<SeatQuantity> seats)
{
    using (var repository = this.contextFactory.Invoke())
    {
        var dto = repository.Set<Conference>().Include(x => x.Seats).FirstOrDefault(x => x.Id == @event.SourceId);
        if (dto != null)
        {
            if (@event.Version > dto.SeatsAvailabilityVersion)
            {
                foreach (var seat in seats)
                {
                    var seatDto = dto.Seats.FirstOrDefault(x => x.Id == seat.SeatType);
                    if (seatDto != null)
                    {
                        seatDto.AvailableQuantity += seat.Quantity;
                    }
                    else
                    {
                        Trace.TraceError("Failed to locate Seat Type read model being updated with id {0}.", seat.SeatType);
                    }
                }

                dto.SeatsAvailabilityVersion = @event.Version;

                repository.Save(dto);
            }
            else
            {
                Trace.TraceWarning ...
            }
        }
        else
        {
            Trace.TraceError ...
        }
    }
}

UpdateAvailableQuantity方法將事件上的版本與讀模型的當前版本進行比較,以檢測可能的重複訊息。

Markus(軟體開發人員)發言:

此檢查僅檢測重複的訊息,而不是超出序列的訊息。

修改UI以顯示剩餘的座位數量

現在,當UI向會議的讀模型查詢座位型別列表時,列表包括當前可用的座位數量。下面的程式碼示例顯示了RegistrationController MVC控制器如何使用SeatType類的AvailableQuantity:

private OrderViewModel CreateViewModel()
{
    var seatTypes = this.ConferenceDao.GetPublishedSeatTypes(this.ConferenceAlias.Id);
    var viewModel =
        new OrderViewModel
        {
            ConferenceId = this.ConferenceAlias.Id,
            ConferenceCode = this.ConferenceAlias.Code,
            ConferenceName = this.ConferenceAlias.Name,
            Items =
                seatTypes.Select(
                    s =>
                        new OrderItemViewModel
                        {
                            SeatType = s,
                            OrderItem = new DraftOrderItem(s.Id, 0),
                            AvailableQuantityForOrder = s.AvailableQuantity,
                            MaxSelectionQuantity = Math.Min(s.AvailableQuantity, 20)
                        }).ToList(),
        };

    return viewModel;
}

資料遷移

儲存會議讀模型資料的資料庫有一個新列來儲存用於檢查重複事件的版本號,而儲存座位型別讀模型資料有一個新列來儲存可用的座椅數量。

作為資料遷移的一部分,有必要為每個可用座位(SeatsAvailability)聚合重放事件儲存中的所有事件,以便正確計算可用數量。

不讓命令訊息重複

系統目前使用Azure服務匯流排傳輸訊息。當系統從ConferenceProcessor類的啟動程式碼初始化Azure服務匯流排時,它配置Topic來檢測重複的訊息,如下面的ServiceBusConfig類的程式碼示例所示:

private void CreateTopicIfNotExists() 
{     
    var topicDescription =         
        new TopicDescription(this.topic)         
        {             
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = topic.DuplicateDetectionHistoryTimeWindow,         
        };     
    try     
    {         
        this.namespaceManager.CreateTopic(topicDescription);     
    }     
    catch (MessagingEntityAlreadyExistsException) { } 
} 
備註:您可以在Settings.xml檔案中配置DuplicateDetectionHistoryTimeWindow
可以向Topic元素新增這個屬性。預設值是1小時。

但是,為了使重複檢測工作正常,您必須確保每個訊息都有一個惟一的ID。下面的程式碼示例顯示了MarkSeatsAsReserved命令:

public class MarkSeatsAsReserved : ICommand
{
    public MarkSeatsAsReserved()
    {
        this.Id = Guid.NewGuid();
        this.Seats = new List<SeatQuantity>();
    }

    public Guid Id { get; set; }

    public Guid OrderId { get; set; }

    public List<SeatQuantity> Seats { get; set; }

    public DateTime Expiration { get; set; }
}

CommandBus類中的BuildMessage方法使用命令Id建立一個惟一的訊息Id, Azure服務匯流排可以使用這個訊息Id來檢測重複:

private BrokeredMessage BuildMessage(Envelope command) 
{ 
    var stream = new MemoryStream(); 
    ...

    var message = new BrokeredMessage(stream, true);
    if (!default(Guid).Equals(command.Body.Id))
    {
        message.MessageId = command.Body.Id.ToString();
    }

...

    return message;
} 

保證訊息順序

團隊決定使用Azure服務匯流排訊息會話來保證系統中的訊息順序。

系統從ConferenceProcessor類中的OnStart方法配置Azure服務匯流排Topic和訂閱。Settings.xml配置檔案中的配置指定了具體的訂閱使用會話。ServiceBusConfig類中的以下程式碼示例顯示了系統如何建立和配置訂閱。

private void CreateSubscriptionIfNotExists(NamespaceManager namespaceManager, TopicSettings topic, SubscriptionSettings subscription)
{
    var subscriptionDescription =
        new SubscriptionDescription(topic.Path, subscription.Name)
        {
            RequiresSession = subscription.RequiresSession
        };

    try
    {
        namespaceManager.CreateSubscription(subscriptionDescription);
    }
    catch (MessagingEntityAlreadyExistsException) { }
}

以下來自SessionSubscriptionReceiver類的程式碼示例演示瞭如何使用會話接收訊息:

private void ReceiveMessages(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        MessageSession session;
        try
        {
            session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession);
        }
        catch (Exception e)
        {
            ...
        }

        if (session == null)
        {
            Thread.Sleep(100);
            continue;
        }


        while (!cancellationToken.IsCancellationRequested)
        {
            BrokeredMessage message = null;
            try
            {
                try
                {
                    message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero));
                }
                catch (Exception e)
                {
                    ...
                }

                if (message == null)
                {
                    // If we have no more messages for this session, exit and try another.
                    break;
                }

                this.MessageReceived(this, new BrokeredMessageEventArgs(message));
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
        }

        this.receiveRetryPolicy.ExecuteAction(() => session.Close());
    }
}

private MessageSession DoAcceptMessageSession()
{
    try
    {
        return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45));
    }
    catch (TimeoutException)
    {
        return null;
    }
}

Markus(軟體開發人員)發言:

您可能會發現,將使用訊息會話的ReceiveMessages方法的這個版本與SubscriptionReceiver類中的原始版本進行比較是很有用的。

您必須確保當你傳送訊息包含一個會話ID,這樣才能使用訊息會話接收一條訊息。系統使用事件的SourceID作為會話ID,如下面的程式碼示例所示的EventBus類中的BuildMessage方法:

var message = new BrokeredMessage(stream, true);
message.SessionId = @event.SourceId.ToString();

通過這種方式,您可以確保以正確的順序接收來自單個源的所有訊息。

Poe(IT運維人員)發言:

在V2版本中,團隊更改了系統建立Azure服務匯流排Topic和訂閱的方式。之前,SubscriptionReceiver類建立了它們(如果它們還不存在)。現在,系統在應用程式啟動時使用配置資料建立它們。這發生在啟動過程的早期,以避免在系統初始化訂閱之前將訊息傳送到Topic時丟失訊息的風險。

然而,只有當訊息按正確的順序傳遞到總線上時,會話才能保證按順序傳遞訊息。如果系統非同步傳送訊息,則必須特別注意確保訊息以正確的順序放在總線上。在我們的系統中,來自每個單獨聚合例項的事件按順序到達是很重要的,但是我們不關心來自不同聚合例項的事件的順序。因此,儘管系統非同步傳送事件,EventStoreBusPublisher例項仍然會在傳送下一個事件之前等待前一個事件已傳送的確認。以下來自TopicSender類的示例說明了這一點:

public void Send(Func<BrokeredMessage> messageFactory)
{
    var resetEvent = new ManualResetEvent(false);
    Exception exception = null;
    this.retryPolicy.ExecuteAction(
        ac =>
        {
            this.DoBeginSendMessage(messageFactory(), ac);
        },
        ar =>
        {
            this.DoEndSendMessage(ar);
        },
        () => resetEvent.Set(),
        ex =>
        {
            Trace.TraceError("An unrecoverable error occurred while trying to send a message:\r\n{0}", ex);
            exception = ex;
            resetEvent.Set();
        });

    resetEvent.WaitOne();
    if (exception != null)
    {
        throw exception;
    }
}

Jana(軟體架構師)發言:

此程式碼示例展示了系統如何使用Transient Fault Handling Application Block來讓非同步呼叫可靠。

有關訊息排序和Azure服務匯流排的更多資訊,請參見Microsoft Azure Queues and Microsoft Azure Service Bus Queues - Compared and Contrasted

有關非同步傳送訊息和排序的資訊,請參閱部落格文章Microsoft Azure Service Bus Splitter and Aggregator

從會議管理限界上下文中持久化事件

團隊決定建立一個包含所有傳送的命令和事件的訊息日誌。這將使訂單和註冊限界上下文能夠從會議管理限界上下文查詢此日誌,以獲取其構建讀模型所需的事件。這不是事件源,因為我們沒有使用這些事件來重建聚合的狀態,儘管我們使用類似的技術來捕獲和持久化這些整合事件。

Gary(CQRS專家)發言:

此訊息日誌確保不會丟失任何訊息,以便將來能夠滿足其他需求。

向訊息新增額外元資料

系統現在將所有訊息儲存到訊息日誌中。為了方便查詢特定命令或事件,系統現在向每個訊息添加了更多的元資料。以前,惟一的元資料是事件型別,現在,事件元資料包括事件型別、名稱空間、程式集和路徑。系統將元資料新增到EventBus類中的事件和CommandBus類中的命令中。

捕獲訊息並將訊息持久化到訊息日誌中

系統使用Azure服務匯流排中對會議/命令和會議/事件topic的額外訂閱來接收系統中每條訊息的副本。然後,它將訊息附加到Azure表儲存中。下面的程式碼示例顯示了AzureMessageLogWriter類的例項,它用於將訊息儲存到表中:

public class MessageLogEntity : TableServiceEntity 
{ 
    public string Kind { get; set; }     
    public string CorrelationId { get; set; }     
    public string MessageId { get; set; }     
    public string SourceId { get; set; }     
    public string AssemblyName { get; set; }     
    public string Namespace { get; set; }     
    public string FullName { get; set; }     
    public string TypeName { get; set; }     
    public string SourceType { get; set; }     
    public string CreationDate { get; set; }     
    public string Payload { get; set; } 
} 

Kind屬性指定訊息是命令還是事件。MessageId和CorrelationId屬性由訊息傳遞基礎設施設定的,其餘屬性是從訊息元資料中設定的。

下面的程式碼示例顯示了這些訊息的分割槽和RowKey的定義:

PartitionKey = message.EnqueuedTimeUtc.ToString("yyyMM"),
RowKey = message.EnqueuedTimeUtc.Ticks.ToString("D20") + "_" + message.MessageId

注意,RowKey儲存了訊息最初發送的順序,並新增到訊息ID上,以確保惟一性,以防兩條訊息同時入隊。

Jana(軟體架構師)發言:

這與事件儲存不同,在事件儲存區中,分割槽鍵標識聚合例項,而RowKey標識聚合的版本號。

資料遷移

當Contoso將系統從V1遷移到V2時,它將使用訊息日誌在訂單和註冊限界上下文中重建會議和價格訂單的讀模型。

Gary(CQRS專家)發言:

Contoso可以在需要重建與聚合無關的事件構建的讀模型時來使用訊息日誌,例如來自會議管理限界上下文的整合事件。

會議讀模型包含會議的資訊,幷包含來自會議管理限界上下文的ConferenceCreated、ConferenceUpdated、ConferencePublished、ConferenceUnpublished、SeatCreated和SeatUpdated事件的資訊。

價格訂單讀模型持有來自於SeatCreated和SeatUpdated事件的資訊,這些事件來自於會議管理限界上下文。

然而,在V1中,這些事件訊息沒有被持久化,因此讀模型不能在V2中重新填充。為了解決這個問題,團隊實現了一個數據遷移實用程式,它使用一種最佳方法來生成包含要儲存在訊息日誌中的丟失資料的事件。例如,在遷移到V2之後,訊息日誌不包含ConferenceCreated事件,因此遷移實用程式在會議管理限界上下文使用的資料庫中找到這些資訊,並建立丟失的事件。您可以在MigrationToV2專案的Migrator類中的GeneratePastEventLogMessagesForConferenceManagement方法中看到這是如何完成的。

Markus(軟體開發人員)發言:

您可以在這個類中看到,Contoso還將所有現有的事件源事件複製到訊息日誌中。

如下面所示,Migrator類中的RegenerateViewModels方法重新構建讀取的模型。它通過呼叫Query方法從訊息日誌中檢索所有事件,然後使用ConferenceViewModelGenerator和PricedOrderViewModelUpdater類來處理訊息。

internal void RegenerateViewModels(AzureEventLogReader logReader, string dbConnectionString)
{
    var commandBus = new NullCommandBus();

    Database.SetInitializer<ConferenceRegistrationDbContext>(null);

    var handlers = new List<IEventHandler>();
    handlers.Add(new ConferenceViewModelGenerator(() => new ConferenceRegistrationDbContext(dbConnectionString), commandBus));
    handlers.Add(new PricedOrderViewModelUpdater(() => new ConferenceRegistrationDbContext(dbConnectionString)));

    using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
    {
        context.UpdateTables();
    }

    try
    {
        var dispatcher = new MessageDispatcher(handlers);
        var events = logReader.Query(new QueryCriteria { });

        dispatcher.DispatchMessages(events);
    }
    catch
    {
        using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
        {
            context.RollbackTablesMigration();
        }

        throw;
    }
}

Jana(軟體架構師)發言:

查詢可能不會很快,因為它將從多個分割槽檢索實體。

注意這個方法如何使用NullCommandBus例項來接收來自ConferenceViewModelGenerator例項的任何命令,因為我們只是在這裡重新構建讀模型。

以前,PricedOrderViewModelGenerator使用ConferenceDao類來獲取關於座位的資訊。現在,它是自治的,並直接處理SeatCreated和SeatUpdated事件來維護這些資訊。作為遷移的一部分,必須將此資訊新增到讀模型中。在前面的程式碼示例中,PricedOrderViewModelUpdater類只處理SeatCreated和SeatUpdated事件,並將缺失的資訊新增到價格訂單讀模型中。

從V1遷移到V2

從V1遷移到V2需要更新已部署的應用程式程式碼並遷移資料。在生產環境中執行遷移之前,應該始終在測試環境中演練遷移。以下是所需步驟:

  1. 將V2版本部署到Azure的staging環境中。V2版本有一個MaintenanceMode屬性,最初設定為true。在此模式下,應用程式向用戶顯示一條訊息,說明站點當前正在進行維護,而工作角色將不處理訊息。
  2. 準備好之後,將V2版本(仍然處於維護模式,MaintenanceMode為true)切換到Azure生產環境中。
  3. 讓V1版本(現在在staging環境中執行)執行幾分鐘,以確保所有正在執行的訊息都完成了它們的處理。
  4. 執行遷移程式來遷移資料(參見下面)。
  5. 成功完成資料遷移後,將每種工作角色的MaintenanceMode屬性更改為false。
  6. V2版本現在執行在Azure中。

Jana(軟體架構師)發言:

團隊考慮使用單獨的應用程式在升級過程中向用戶顯示一條訊息,告訴他們站點正在進行維護。然而,在V2版本中使用MaintenanceMode屬性提供了一個更簡單的過程,併為應用程式添加了一個潛在有用的新特性。

Poe(IT運維人員)發言:

由於對事件儲存的更改,不可能執行從V1到V2的無停機升級。然而,團隊所做的更改將確保從V2遷移到V3將不需要停機時間。

Markus(軟體開發人員)發言:

團隊對遷移實用程式應用了各種優化,例如批處理操作,以最小化停機時間。

下面幾節總結了從V1到V2的資料遷移。這些步驟中的一些在前面已經討論過,涉及到應用程式的特定更改或增強。

團隊為V2引入的一個更改是,將所有命令和事件訊息的副本儲存在訊息日誌中,以便作為未來的證據,通過捕獲將來可能使用的所有內容來保證應用程式的安全性。遷移過程考慮到了這個新特性。

因為遷移過程複製了大量的資料,所以您應該在Azure工作角色中執行遷移過程,以最小化成本。遷移實用程式是一個控制檯應用程式,因此您可以使用Azure和遠端桌面服務。有關如何在Azure角色例項中執行應用程式的資訊,請參見Using Remote Desktop with Microsoft Azure Roles。

Poe(IT運維人員)發言:

在一些組織中,安全策略不允許您在Azure生產環境使用遠端桌面服務。但是,您只需要一個在遷移期間承載遠端桌面會話的工作角色,您可以在遷移完成後刪除它。您還可以將遷移程式碼作為工作角色而不是控制檯應用程式執行,並確保它記錄遷移的狀態,以便您驗證。

為會議管理限界上下文生成過去的日誌訊息

遷移過程的一部分是在可能的情況下重新建立V1版本處理後丟棄的訊息,然後將它們新增到訊息日誌中。在V1版本中,所有從會議管理限界上下文傳送到訂單和註冊限界上下文的整合事件都以這種方式丟失了。系統不能重新建立所有丟失的事件,但可以建立表示遷移時系統狀態的事件。

有關更多資訊,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

遷移事件源裡的事件

在V2版本中,事件儲存為每個事件儲存額外的元資料,以便於查詢事件。遷移過程將所有事件從現有事件儲存複製到具有新模式的新事件儲存。

Jana(軟體架構師)發言:

原始事件不會以任何方式更新,而是被視為不可變的。

同時,系統將所有這些事件的副本新增到V2版本中引入的訊息日誌中。

有關更多資訊,請參見MigrationToV2專案中Migrator類中的MigrateEventSourcedAndGeneratePastEventLogs。

重建讀模型**

V2版本包括對訂單和註冊限界上下文中讀模型定義的幾個更改。MigrationToV2專案在訂單和註冊限界上下文中重新構建會議的讀模型和價格訂單的讀模型。

有關更多資訊,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

對測試的影響

在這個過程的這個階段,測試團隊繼續擴充套件驗收測試集合。他們還建立了一組測試來驗證資料遷移過程。

再說SpecFlow

之前,這組SpecFlow測試以兩種方式實現:通過自動化web瀏覽器模擬使用者互動,或者直接在MVC控制器上操作。這兩種方法都有各自的優缺點,我們在第4章“擴充套件和增強訂單和註冊限界上下文”中討論過。

在與另一位專家討論了這些測試之後,團隊還實現了第三種方法。從領域驅動設計(DDD)方法的角度來看,UI不是領域模型的一部分,核心團隊的重點應該是在領域專家的幫助下理解領域,並在領域中實現業務邏輯。UI只是機械部分,用於使使用者能夠與領域進行互動。因此,驗收測試應該包括驗證領域模型是否以領域專家期望的方式工作。因此,團隊使用SpecFlow建立了一組驗收測試,這些測試旨在在不影響系統UI部分的情況下測試領域。

下面的程式碼示例顯示了SelfRegistrationEndToEndWithDomain.feature檔案,該檔案在Conference.AcceptanceTests專案中的Features\Domain\Registration資料夾裡,注意When和Then子句怎麼使用命令和事件的。

Gary(CQRS專家)發言:

通常,如果您的領域模型只使用聚合,您會期望When子句傳送命令,Then子句檢視事件或異常。然而,在本例中,領域模型包含一個通過傳送命令來響應事件的流程管理器。測試將檢查是否傳送了所有預期的命令,並引發了所有預期的事件。

Feature: Self Registrant end to end scenario for making a Registration for a Conference site with Domain Commands and Events
    In order to register for a conference
    As an Attendee
    I want to be able to register for the conference, pay for the Registration Order and associate myself with the paid Order automatically


Scenario: Make a reservation with the selected Order Items
Given the list of the available Order Items for the CQRS summit 2012 conference
    | seat type                 | rate | quota |
    | General admission         | $199 | 100   |
    | CQRS Workshop             | $500 | 100   |
    | Additional cocktail party | $50  | 100   |
And the selected Order Items
    | seat type                 | quantity |
    | General admission         | 1        |
    | Additional cocktail party | 1        |
When the Registrant proceeds to make the Reservation
    # command:RegisterToConference
Then the command to register the selected Order Items is received 
    # event: OrderPlaced
And the event for Order placed is emitted
    # command: MakeSeatReservation
And the command for reserving the selected Seats is received
    # event: SeatsReserved
And the event for reserving the selected Seats is emitted
    # command: MarkSeatsAsReserved
And the command for marking the selected Seats as reserved is received
    # event: OrderReservationCompleted 
And the event for completing the Order reservation is emitted
    # event: OrderTotalsCalculated
And the event for calculating the total of $249 is emitted

下面的程式碼示例顯示了feature檔案的一些步驟實現。這些步驟使用命令匯流排傳送命令。

[When(@"the Registrant proceed to make the Reservation")]
public void WhenTheRegistrantProceedToMakeTheReservation()
{
    registerToConference = ScenarioContext.Current.Get<RegisterToConference>();
    var conferenceAlias = ScenarioContext.Current.Get<ConferenceAlias>();

    registerToConference.ConferenceId = conferenceAlias.Id;
    orderId = registerToConference.OrderId;
    this.commandBus.Send(registerToConference);

    // Wait for event processing
    Thread.Sleep(Constants.WaitTimeout);
}

[Then(@"the command to register the selected Order Items is received")]
public void ThenTheCommandToRegisterTheSelectedOrderItemsIsReceived()
{
    var orderRepo = EventSourceHelper.GetRepository<Registration.Order>();
    Registration.Order order = orderRepo.Find(orderId);

    Assert.NotNull(order);
    Assert.Equal(orderId, order.Id);
}

[Then(@"the event for Order placed is emitted")]
public void ThenTheEventForOrderPlacedIsEmitted()
{
    var orderPlaced = MessageLogHelper.GetEvents<OrderPlaced>(orderId).SingleOrDefault();

    Assert.NotNull(orderPlaced);
    Assert.True(orderPlaced.Seats.All(
        os => registerToConference.Seats.Count(cs => cs.SeatType == os.SeatType && cs.Quantity == os.Quantity) == 1));
}

在遷移過程中發現的bug

當測試團隊在遷移之後在系統上執行測試時,我們發現訂單和註冊限界上下文中座位型別的數量與遷移之前的數量不同。調查揭示了以下原因。

如果會議從未釋出過,則會議管理限界上下文允許業務客戶刪除座位型別,但不會引發整合事件向訂單和註冊限界上下文報告這一情況。所以,當業務客戶建立新的座位型別時,訂單和註冊限界上下文從會議管理限界上下文接收事件,而不是當業務客戶刪除座位型別時。

遷移過程的一部分建立一組整合事件,以替換V1版本處理後丟棄的事件。它通過讀取會議管理限界上下文使用的資料庫來建立這些事件。此過程沒有為已刪除的座位型別建立整合事件。

總之,在V1版本中,已刪除的座位型別錯誤地出現在訂單和註冊限界上下文的讀模型中。在遷移到V2版本之後,這些已刪除的座位型別沒有出現在訂單和註冊限界上下文的讀模型中。

Poe(IT運維人員)發言:

測試遷移過程不僅驗證遷移是否按預期執行,而且可能揭示應用程式本身的bug。

總結

在我們旅程的這個階段,我們對系統進行了版本控制,並完成了V2偽生產版本。這個新版本包含了一些額外的功能和特性,比如支援不需要付費的訂單和在UI中顯示更多資訊。

我們還對基礎設施做了一些改變。例如,我們使更多的訊息具有冪等性,現在持久化整合事件。下一章將描述我們旅程的最後階段,我們將繼續增強基礎設施,並在準備釋出V3版本時加強系統