1. 程式人生 > >案例分析:基於訊息的分散式架構

案例分析:基於訊息的分散式架構

美國電腦科學家,LaTex的作者Leslie Lamport說:“分散式系統就是這樣一個系統,系統中一個你甚至都不知道的計算機出了故障,卻可能導致你自己的計算機不可用。”一語道破了開發分散式系統的玄機,那就是它的複雜與不可控。所以Martin Fowler強調:分散式呼叫的第一原則就是不要分散式。這句話看似頗具哲理,然而就企業應用系統而言,只要整個系統在不停地演化,並有多個子系統共同存在時,這條原則就會被迫打破。蓋因為在當今的企業應用系統中,很難尋找到完全不需要分散式呼叫的場景。Martin Fowler提出的這條原則,一方面是希望設計者能夠審慎地對待分散式呼叫,另一方面卻也是分散式系統自身存在的缺陷所致。無論是CORBA,還是EJB 2;無論是RPC平臺,還是Web Service,都因為駐留在不同程序空間的分散式元件,而引入額外的複雜度,並可能對系統的效率、可靠性、可預測性等諸多方面帶來負面的影響。

然而,不可否認的是在企業應用系統領域,我們總是會面對不同系統之間的通訊、整合與整合,尤其當面臨異構系統時,這種分散式的呼叫與通訊變得越重要,它在架構設計中就更加凸顯其價值。並且,從業務分析與架構質量的角度來講,我們也希望在系統架構中儘可能地形成對服務的重用,通過獨立執行在程序中服務的形式,徹底解除客戶端與服務端的耦合。這常常是架構演化的必然道路。在我的同事陳金洲發表在InfoQ上的文章《架構腐化之謎》中,就認為可以通過“將獨立的模組放入獨立的程序”來解決架構因為程式碼規模變大而腐化的問題。

隨著網路基礎設施的逐步成熟,從RPC進化到Web Service,並在業界開始普遍推行SOA,再到後來的RESTful平臺以及雲端計算中的PaaS與SaaS概念的推廣,分散式架構在企業應用中開始呈現出不同的風貌,然而殊途同歸,這些分散式架構的目標仍然是希望回到建造巴別塔的時代,系統之間的交流不再為不同語言與平臺的隔閡而產生障礙。正如Martin Fowler在《企業整合模式》一書的序中寫道:“整合之所以重要是因為相互獨立的應用是沒有生命力的。我們需要一種技術能將在設計時並未考慮互操作的應用整合起來,打破它們之間的隔閡,獲得比單個應用更多的效益”。這或許是分散式架構存在的主要意義。

1、整合模式中的訊息模式

歸根結底,企業應用系統就是對資料的處理,而對於一個擁有多個子系統的企業應用系統而言,它的基礎支撐無疑就是對訊息的處理。與物件不同,訊息本質上是一種資料結構(當然,物件也可以看做是一種特殊的訊息),它包含消費者與服務雙方都能識別的資料,這些資料需要在不同的程序(機器)之間進行傳遞,並可能會被多個完全不同的客戶端消費。在眾多分散式技術中,訊息傳遞相較檔案傳遞與遠端過程呼叫(RPC)而言,似乎更勝一籌,因為它具有更好的平臺無關性,並能夠很好地支援併發與非同步呼叫。對於Web Service與RESTful而言,則可以看做是訊息傳遞技術的一種衍生或封裝。在《面向模式的軟體架構(卷四)》一書中,將關於訊息傳遞的模式劃歸為分散式基礎設施的範疇,這是因為諸多訊息中介軟體產品的出現,使得原來需要開發人員自己實現的功能,已經可以直接重用。這極大地降低了包括設計成本、實現成本在內的開發成本。因此,對於架構師的要求也就從原來的設計實現,轉變為對業務場景和功能需求的判斷,從而能夠正確地進行架構決策、技術選型與模式運用。

常用的訊息模式

在我參與過的所有企業應用系統中,無一例外地都採用(或在某些子系統與模組中部分採用)了基於訊息的分散式架構。但是不同之處在於,讓我們做出架構決策的證據卻迥然而異,這也直接影響我們所要應用的訊息模式。

訊息通道(Message Channel)模式

我們常常運用的訊息模式是Message Channel(訊息通道)模式,如圖1所示。

圖1 Message Channel模式(圖片來自eaipatterns 

訊息通道作為在客戶端(消費者,Consumer)與服務(生產者,Producer)之間引入的間接層,可以有效地解除二者之間的耦合。只要實現規定雙方需要通訊的訊息格式,以及處理訊息的機制與時機,就可以做到消費者對生產者的“無知”。事實上,該模式可以支援多個生產者與消費者。例如,我們可以讓多個生產者向訊息通道傳送訊息,因為消費者對生產者的無知性,它不必考慮究竟是哪個生產者發來的訊息。

雖然訊息通道解除了生產者與消費者之間的耦合,使得我們可以任意地對生產者與消費者進行擴充套件,但它又同時引入了各自對訊息通道的依賴,因為它們必須知道通道資源的位置。要解除這種對通道的依賴,可以考慮引入Lookup服務來查詢該通道資源。例如,在JMS中就可以通過JNDI來獲取訊息通道Queue。若要做到充分的靈活性,可以將與通道相關的資訊儲存到配置檔案中,Lookup服務首先通過讀取配置檔案來獲得通道。

訊息通道通常以佇列的形式存在,這種先進先出的資料結構無疑最為適合這種處理訊息的場景。微軟的MSMQ、IBM MQ、JBoss MQ以及開源的RabbitMQApache ActiveMQ都通過佇列實現了Message Channel模式。因此,在選擇運用Message Channel模式時,更多地是要從質量屬性的層面對各種實現了該模式的產品進行全方位的分析與權衡。例如,訊息通道對併發的支援以及在效能上的表現;訊息通道是否充分地考慮了錯誤處理;對訊息安全的支援;以及關於訊息持久化、災備(fail over)與叢集等方面的支援。因為通道傳遞的訊息往往是一些重要的業務資料,一旦通道成為故障點或安全性的突破點,對系統就會造成災難性的影響。在本文的第二部分,我將給出一個實際案例來闡釋在進行架構決策時應該考慮的架構因素,並由此做出正確地決策。

釋出者-訂閱者(Publisher-Subscriber)模式

一旦訊息通道需要支援多個消費者時,就可能面臨兩種模型的選擇:拉模型與推模型。拉模型是由訊息的消費者發起的,主動權把握在消費者手中,它會根據自己的情況對生產者發起呼叫。如圖2所示:

圖2 拉模型

拉模型的另一種體現則由生產者在狀態發生變更時,通知消費者其狀態發生了改變。但得到通知的消費者卻會以回撥方式,通過呼叫傳遞過來的消費者物件獲取更多細節訊息。

在基於訊息的分散式系統中,拉模型的消費者通常以Batch Job的形式,根據事先設定的時間間隔,定期偵聽通道的情況。一旦發現有訊息傳遞進來,就會轉而將訊息傳遞給真正的處理器(也可以看做是消費者)處理訊息,執行相關的業務。在本文第二部分介紹的醫療衛生系統,正是通過引入Quartz.NET實現了Batch Job,完成對訊息通道中訊息的處理。

推模型的主動權常常掌握在生產者手中,消費者被動地等待生產者發出的通知,這就要求生產者必須瞭解消費者的相關資訊。如圖3所示:

圖3 推模型

對於推模型而言,消費者無需瞭解生產者。在生產者通知消費者時,傳遞的往往是訊息(或事件),而非生產者自身。同時,生產者還可以根據不同的情況,註冊不同的消費者,又或者在封裝的通知邏輯中,根據不同的狀態變化,通知不同的消費者。

兩種模型各有優勢。拉模型的好處在於可以進一步解除消費者對通道的依賴,通過後臺任務去定期訪問訊息通道。壞處是需要引入一個單獨的服務程序,以Schedule形式執行。而對於推模型而言,訊息通道事實上會作為消費者觀察的主體,一旦發現訊息進入,就會通知消費者執行對訊息的處理。無論推模型,拉模型,對於訊息物件而言,都可能採用類似Observer模式的機制,實現消費者對生產者的訂閱,因此這種機制通常又被稱為Publisher-Subscriber模式,如圖4所示:

圖4 Publisher-Subscriber模式(圖片來自eaipatterns )

通常情況下,釋出者和訂閱者都會被註冊到用於傳播變更的基礎設施(即訊息通道)上。釋出者會主動地瞭解訊息通道,使其能夠將訊息傳送到通道中;訊息通道一旦接收到訊息,會主動地呼叫註冊在通道中的訂閱者,進而完成對訊息內容的消費。

對於訂閱者而言,有兩種處理訊息的方式。一種是廣播機制,這時訊息通道中的訊息在出列的同時,還需要複製訊息物件,將訊息傳遞給多個訂閱者。例如,有多個子系統都需要獲取從CRM系統傳來的客戶資訊,並根據傳遞過來的客戶資訊,進行相應的處理。此時的訊息通道又被稱為Propagation通道。另一種方式則屬於搶佔機制,它遵循同步方式,在同一時間只能有一個訂閱者能夠處理該訊息。實現Publisher-Subscriber模式的訊息通道會選擇當前空閒的唯一訂閱者,並將訊息出列,並傳遞給訂閱者的訊息處理方法。

目前,有許多訊息中介軟體都能夠很好地支援Publisher-Subscriber模式,例如JMS介面規約中對於Topic物件提供的MessagePublisher與MessageSubscriber介面。RabbitMQ也提供了自己對該模式的實現。微軟的MSMQ雖然引入了事件機制,可以在佇列收到訊息時觸發事件,通知訂閱者。但它並非嚴格意義上的Publisher-Subscriber模式實現。由微軟MVP Udi Dahan作為主要貢獻者的NServiceBus,則對MSMQ以及WCF做了進一層包裝,並能夠很好地實現這一模式。

訊息路由(Message Router)模式

無論是Message Channel模式,還是Publisher-Subscriber模式,佇列在其中都扮演了舉足輕重的角色。然而,在企業應用系統中,當系統變得越來越複雜時,對效能的要求也會越來越高,此時對於系統而言,可能就需要支援同時部署多個佇列,並可能要求分散式部署不同的佇列。這些佇列可以根據定義接收不同的訊息,例如訂單處理的訊息,日誌資訊,查詢任務訊息等。這時,對於訊息的生產者和消費者而言,並不適宜承擔決定訊息傳遞路徑的職責。事實上,根據S單一職責原則,這種職責分配也是不合理的,它既不利於業務邏輯的重用,也會造成生產者、消費者與訊息佇列之間的耦合,從而影響系統的擴充套件。

既然這三種物件(元件)都不宜承擔這樣的職責,就有必要引入一個新的物件專門負責傳遞路徑選擇的功能,這就是所謂的Message Router模式,如圖5所示:

圖5 Message Router模式(圖片來自eaipatterns )

通過訊息路由,我們可以配置路由規則指定訊息傳遞的路徑,以及指定具體的消費者消費對應的生產者。例如指定路由的關鍵字,並由它來繫結具體的佇列與指定的生產者(或消費者)。路由的支援提供了訊息傳遞與處理的靈活性,也有利於提高整個系統的訊息處理能力。同時,路由物件有效地封裝了尋找與匹配訊息路徑的邏輯,就好似一個調停者(Meditator),負責協調訊息、佇列與路徑定址之間關係。

除了以上的模式之外,Messaging模式提供了一個通訊基礎架構,使得我們可以將獨立開發的服務整合到一個完整的系統中。 Message Translator模式則完成對訊息的解析,使得不同的訊息通道能夠接收和識別不同格式的訊息。而且通過引入這樣的物件,也能夠很好地避免出現盤根錯節,彼此依賴的多個服務。Message Bus模式可以為企業提供一個面向服務的體系架構。它可以完成對訊息的傳遞,對服務的適配與協調管理,並要求這些服務以統一的方式完成協作。

2、訊息模式的應用場景

基於訊息的分散式架構總是圍繞著訊息來做文章。例如可以將訊息封裝為物件,或者指定訊息的規範例如SOAP,或者對實體物件的序列化與反序列化。這些方式的目的只有一個,就是將訊息設計為生產者和消費者都能夠明白的格式,並能通過訊息通道進行傳遞。

場景一:基於訊息的統一服務架構

在製造工業的CIMS系統中,我們嘗試將各種業務以服務的形式公開給客戶端的呼叫者,例如定義這樣的介面:

public interface IService {
    IMessage Execute(IMessage aMessage);
    void SendRequest(IMessage aMessage);
}

之所以能夠設計這樣的服務,原因在於我們對業務資訊進行了高度的抽象,以訊息的形式在服務之間傳遞。此時的訊息其實是生產者與消費者之間的契約或介面,只要遵循這樣的契約,按照規定的格式對訊息進行轉換與抽取,就能很好地支援系統的分散式處理。

在這個CIMS系統中,我們將訊息劃分為ID,Name和Body,通過定義如下的介面方法,可以獲得訊息主體的相關屬性:

public interface IMessage:ICloneable
{
     string MessageID { get; set; }
     string MessageName() { get; set; }
     IMessageItemSequence CreateMessageBody();
     IMessageItemSequence GetMessageBody();
}

訊息主體類Message實現了IMessage介面。在該類中,訊息體Body為IMessageItemSequence型別。這個型別用於獲取和設定訊息的內容:Value和Item:

public interface IItemValueSetting {
     string getSubValue(string name);
     void setSubValue(string name, string value);  
}
public interface IMessageItemSequence:IItemValueSetting, ICloneable
{      
     IMessageItem GetMessageItem(string aName);
     IMessageItem CreateMessageItem(string aName);       
}

Value為字串型別,它利用了HashTable儲存Key和Value的鍵值對。Item則為IMessageItem型別,在IMessageItemSequence的實現類中,同樣利用了HashTable儲存Key和Item的鍵值對。

IMessageItem支援訊息體的巢狀。它包含了兩部分:SubValue和SubItem。實現的方式和IMessageItemSequence相似。通過定義這樣的巢狀結構,使得訊息的擴充套件成為可能。一般的訊息結構如下所示:

       IMessage——Name
                     ——ID
                     ——Body(IMessageItemSequence)
                            ——Value
                            ——Item(IMessageItem)
                                   ——SubValue
                                   ——SubItem(IMessageItem)
                                          ——……

各個訊息物件之間的關係如圖6所示:

圖6 訊息物件之間的關係

在實現服務程序通訊之前,我們必須定義好各個服務或各個業務的訊息格式。通過訊息體的方法在服務的一端設定訊息的值,然後傳送,並在服務的另一端獲得這些值。例如傳送訊息端定義如下的訊息體:

IMessageFactory factory = new MessageFactory();
IMessage message = factory.CreateMessage();
message.SetMessageName("service1");

IMessageItemSequence body = message.CreateMessageBody();
body.SetSubValue("subname1","subvalue1");
body.SetSubValue("subname2","subvalue2");

IMessageItem item1 = body.CreateMessageItem(”item1”);
item1.SetSubValue("subsubname11","subsubvalue11");
item1.SetSubValue("subsubname12","subsubvalue12");

//Send Request Message
MyServiceClient service = new MyServiceClient("Client");
service.SendRequest(message);

我們在客戶端引入了一個ServiceLocator物件,它通過MessageQueueListener對訊息佇列進行偵聽,一旦接收到訊息,就獲取該訊息中的name去定位它所對應的服務,然後呼叫服務的Execute(aMessage)方法,執行相關的業務。

ServiceLocator承擔的定位職責其實是對儲存在ServiceContainer容器中的服務進行查詢。ServiceContainer容器可以讀取配置檔案,在啟動服務的時候初始化所有的分散式服務(注意,這些服務都是無狀態的),並對這些服務進行管理。它封裝了服務的基本資訊,諸如服務所在的位置,服務的部署方式等,從而避免服務的呼叫者直接依賴於服務的細節,既減輕了呼叫者的負擔,還能夠較好地實現服務的擴充套件與遷移。

在這個系統中,我們主要引入了Messaging模式,通過定義的IMessage介面,使得我們更好地對服務進行抽象,並以一種扁平的格式儲存資料資訊,從而解除服務之間的耦合。只要各個服務就共用的訊息格式達成一致,請求者就可以不依賴於接收者的具體介面。通過引入的Message物件,我們就可以建立一種在行業中通用的訊息模型與分散式服務模型。事實上,基於這樣的一個框架與平臺,在對製造行業的業務進行開發時,開發人員最主要的活動是與領域專家就各種業務的訊息格式進行討論,這樣一種面向領域的訊息語言,很好地掃清了技術人員與業務人員的溝通障礙;同時在各個子系統之間,我們也只需要維護服務間相互傳遞的訊息介面表。每個服務的實現都是完全隔離的,有效地做到了對業務知識與基礎設施的合理封裝與隔離。

對於訊息的格式和內容,我們考慮引入了Message Translator模式,負責對前面定義的訊息結構進行翻譯和解析。為了進一步減輕開發人員的負擔,我們還可以基於該平臺搭建一個訊息-物件-關係的對映框架,引入實體引擎(Entity Engine)將訊息轉換為領域實體,使得服務的開發者能夠以完全面向物件的思想開發各個服務元件,並通過呼叫持久層實現訊息資料的持久化。同時,利用訊息匯流排(此時的訊息匯流排可以看做是各個服務元件的聯結器)連線不同的服務,並允許非同步地傳遞訊息,對訊息進行編碼。這樣一個基於訊息的分散式架構如圖7所示:

圖7 基於Message Bus的CIMS分散式架構

場景二:訊息中介軟體的架構決策

在一個醫療衛生系統中,我們面臨了客戶對系統性能/可用性的非功能需求。在我們最初啟動該專案時,客戶就表達了對效能與可用性的特別關注。客戶希望終端使用者在進行復雜的替換刪除操作時,能夠具有很好的使用者體驗,簡言之,就是希望能夠快速地得到操作的響應。問題在於這樣的替換刪除操作需要處理比較複雜的業務邏輯,同時牽涉到的關聯資料量非常大,整個操作若需完成,最壞情況下可能需要幾分鐘的時間。我們可以通過引入快取、索引、分頁等多種方式對資料庫操作進行效能調優,但整個操作的耗時始終無法達到客戶的要求。由於該系統是在一個遺留系統的基礎上開發,如果要引入Map-Reduce來處理這些操作,以滿足質量需求,則對架構的影響太大,且不能很好地重用之前系統的某些元件。顯然,付出的成本與收益並不成正比。

通過對需求進行分析,我們注意到最終客戶並不需要實時獲得結果,只要能夠保證最終結果的一致性和完整性即可。關鍵在於就使用者體驗而言,他們不希望經歷漫長的等待,然後再通知他們操作究竟是成功還是失敗。這是一個典型需要通過後臺任務進行非同步處理的場景。

在企業應用系統中,我們常常會遭遇這樣的場景。我們曾經在一個金融系統中嘗試通過自己編寫任務的方式來控制後臺執行緒的併發訪問,並完成對任務的排程。事實證明,這樣的設計並非行之有效。對於這種典型的非同步處理來說,基於訊息傳遞的架構模式才是解決這一問題的最佳辦法。

因為訊息中介軟體的逐步成熟,對於這一問題的架構設計,已經由原來對設計實現的關注轉為如何進行產品選型和技術決策。例如,在.NET平臺下,架構師需要重點考慮的是應該選擇哪種訊息中介軟體來處理此等問題?這就需要我們必須結合具體的業務場景,來識別這種非同步處理方式的風險,然後再根據這些風險去比較各種技術,以求尋找到最適合的方案。

通過分析業務場景以及客戶性質,我們發現該業務場景具有如下特徵:

  • 在一些特定情形下,可能會集中發生批量的替換刪除操作,使得操作的併發量達到高峰;例如FDA要求召回一些違規藥品時,就需要刪除藥品庫中該藥品的資訊;
  • 操作結果不要求實時性,但需要保證操作的可靠性,不能因為異常失敗而導致某些操作無法進行;
  • 自動操作的過程是不可逆轉的,因此需要記錄操作歷史;
  • 基於效能考慮,大多數操作需要呼叫資料庫的儲存過程;
  • 操作的資料需要具備一定的安全性,避免被非法使用者對資料造成破壞;
  • 與操作相關的功能以元件形式封裝,保證元件的可重用性、可擴充套件性與可測試性;
  • 資料量可能隨著終端使用者的增多而逐漸增大;

針對如上的業務需求,我們決定從以下幾個方面對各種技術方案進行橫向的比較與考量。

  • 併發:選擇的訊息佇列一定要很好地支援使用者訪問的併發性;
  • 安全:訊息佇列是否提供了足夠的安全機制;
  • 效能伸縮:不能讓訊息佇列成為整個系統的單一效能瓶頸;
  • 部署:儘可能讓訊息佇列的部署更為容易;
  • 災備:不能因為意外的錯誤、故障或其他因素導致處理資料的丟失;
  • API易用性:處理訊息的API必須足夠簡單、並能夠很好地支援測試與擴充套件;

我們先後考察了MSMQ、Resque、ActiveMQ和RabbitMQ,通過查詢相關資料,以及編寫Spike程式碼驗證相關質量,我們最終選擇了RabbitMQ。

我們選擇放棄MSMQ,是因為它嚴重依賴Windows作業系統;它雖然提供了易用的GUI方便管理人員對其進行安裝和部署,但若要編寫自動化部署指令碼,卻非常困難。同時,MSMQ的佇列容量不能查過4M位元組,這也是我們無法接收的。Resque的問題是目前僅支援Ruby的客戶端呼叫,不能很好地與.NET平臺整合。此外,Resque對訊息持久化的處理方式是寫入到Redis中,因而需要在已有RDBMS的前提下,引入新的Storage。我們比較傾心於ActiveMQ與RabbitMQ,但通過編寫測試程式碼,採用迴圈傳送大資料訊息以驗證訊息中介軟體的效能與穩定性時,我們發現ActiveMQ的表現並不太讓人滿意。至少,在我們的詢證調研過程中,ActiveMQ會因為頻繁傳送大資料訊息而偶爾出現崩潰的情況。相對而言,RabbitMQ在各個方面都比較適合我們的架構要求。

例如在災備與穩定性方面,RabbitMQ提供了可持久化的佇列,能夠在佇列服務崩潰的時候,將未處理的訊息持久化到磁碟上。為了避免因為傳送訊息到寫入訊息之間的延遲導致資訊丟失,RabbitMQ引入了Publisher Confirm機制以確保訊息被真正地寫入到磁碟中。它對Cluster的支援提供了Active/Passive與Active/Active兩種模式。例如,在Active/Passive模式下,一旦一個節點失敗,Passive節點就會馬上被啟用,並迅速替代失敗的Active節點,承擔起訊息傳遞的職責。如圖8所示:

圖8 Active/Passive Cluster(圖片來自RabbitMQ官方網站)

在併發處理方面,RabbitMQ本身是基於erlang編寫的訊息中介軟體,作為一門面向併發處理的程式語言,erlang對併發處理的天生優勢使得我們對RabbitMQ的併發特性抱有信心。RabbitMQ可以非常容易地部署到Windows、Linux等作業系統下,同時,它也可以很好地部署到伺服器叢集中。它的佇列容量是沒有限制的(取決於安裝RabbitMQ的磁碟容量),傳送與接收資訊的效能表現也非常好。RabbitMQ提供了Java、.NET、Erlang以及C語言的客戶端API,呼叫非常簡單,並且不會給整個系統引入太多第三方庫的依賴。 例如.NET客戶端只需要依賴一個程式集。

即使我們選擇了RabbitMQ,但仍有必要對系統與具體的訊息中介軟體進行解耦,這就要求我們對訊息的生產者與消費者進行抽象,例如定義如下的介面:

    public interface IQueueSubscriber
    {
        void ListenTo<T>(string queueName, Action<T> action);
        void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
        void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
    }

    public interface IQueueProvider
    {
        T Pop<T>(string queueName);
        T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
        T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
        void Push(FunctionalArea functionalArea, string routingKey, object payload);
    }

在這兩個介面的實現類中,我們封裝了RabbitMQ的呼叫類,例如:

    public class RabbitMQSubscriber : IQueueSubscriber
    {
        public void ListenTo<T>(string queueName, Action<T> action)
        {
            using (IConnection connection = _factory.OpenConnection())
            using (IModel channel = connection.CreateModel())
            {
                var consumer = new QueueingBasicConsumer(channel);
                string consumerTag = channel.BasicConsume(queueName, AcknowledgeImmediately, consumer);

                var response = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                var serializer = new JavaScriptSerializer();
                string json = Encoding.UTF8.GetString(response.Body);
                var message = serializer.Deserialize<T>(json);

                action(message);
            }
        }       
    }
    public class RabbitMQProvider : IQueueProvider
    {

        public T Pop<T>(string queueName)
        {
            var returnVal = default(T);
            const bool acknowledgeImmediately = true;

            using (var connection = _factory.OpenConnection())
            using (var channel = connection.CreateModel())
            {
                var response = channel.BasicGet(queueName, acknowledgeImmediately);

                if (response != null)
                {
                    var serializer = new JavaScriptSerializer();
                    var json = Encoding.UTF8.GetString(response.Body);
                    returnVal = serializer.Deserialize<T>(json);
                }
            }

            return returnVal;
        }
    }

我們用Quartz.Net來實現Batch Job。通過定義一個實現了IStatefulJob介面的Job類,在Execute()方法中完成對佇列的偵聽。Job中RabbitMQSubscriber類的ListenTo()方法會呼叫Queue的Dequeue()方法,當接收的訊息到達佇列時,Job會偵聽到訊息達到的事件,然後以同步的方式使得訊息彈出佇列,並將訊息作為引數傳遞給Action委託。因此,在Batch Job的Execute()方法中,可以定義訊息處理的方法,並呼叫RabbitMQSubscriber類的ListenTo()方法,如下所示(注意,這裡傳遞的訊息事實上是Job的Id):

        public void Execute(JobExecutionContext context)
        {
            string queueName = queueConfigurer.GetQueueProviders().Queue.Name;
            try
            {
                queueSubscriber.ListenTo<MyJob>(
queueName,
                    job => request.MakeRequest(job.Id.ToString()));
            }
            catch(Exception err)
            {
                Log.WarnFormat("Unexpected exception while processing queue '{0}', Details: {1}", queueName, err);
            }
        }

        

佇列的相關資訊例如佇列名都儲存在配置檔案中。Execute()方法呼叫了request物件的MakeRequest()方法,並將獲得的訊息(即JobId)傳遞給該方法。它會根據JobId到資料庫中查詢該Job對應的資訊,並執行真正的業務處理。

在對基於訊息處理的架構進行決策時,除了前面提到的考慮因素外,還需要就許多設計細節進行多方位的判斷與權衡。例如針對Job的執行以及佇列的管理,就需要考慮如下因素:

  • 對Queue中Job狀態的監控與查詢;
  • 對Job優先順序的管理;
  • 能否取消或終止執行時間過長的Job;
  • 是否能夠設定Job的執行時間;
  • 是否能夠設定Poll的間隔時間;
  • 能否跨機器分散式的放入Job;
  • 對失敗Job的處理;
  • 能否支援多個佇列,命名佇列;
  • 能否允許執行Job的工作程序對應特定的佇列;
  • 對Dead Message的支援。

3、選擇的時機

究竟在什麼時候,我們應該選擇基於訊息處理的分散式架構?根據我參與的多個企業應用系統的經驗,竊以為需要滿足如下幾個條件:

  • 對操作的實時性要求不高,而需要執行的任務極為耗時;
  • 存在企業內部的異構系統間的整合;
  • 伺服器資源需要合理分配與利用;

對於第一種情況,我們常常會選擇訊息佇列來處理執行時間較長的任務。此時引入的訊息佇列就成了訊息處理的緩衝區。訊息佇列引入的非同步通訊機制,使得傳送方和接收方都不用等待對方返回成功訊息,就可以繼續執行下面的程式碼,從而提高了資料處理的能力。尤其是當訪問量和資料流量較大的情況下,就可以結合訊息佇列與後臺任務,通過避開高峰期對大資料進行處理,就可以有效降低資料庫處理資料的負荷。前面提到的醫療衛生系統正是這樣一種適用場景。

對於不同系統乃至於異構系統的整合,恰恰是訊息模式善於處理的場景。只要規定了訊息的格式與傳遞方式,就可以有效地實現不同系統之間的通訊。在為某汽車製造商開發一個大型系統時,分銷商作為.NET客戶端,需要將資料傳遞到管理中心。這些資料將被Oracle的EBS(E-Business Suite)使用。分銷商管理系統(Dealer Management System,DMS)採用了C/S結構,資料庫為SQL Server,汽車製造商管理中心的EBS資料庫為Oracle 10g。我們需要解決兩種不同資料庫間資料的傳遞。解決方案就是利用MSMQ,將資料轉換為與資料庫無關的訊息資料,並在兩端部署MSMQ伺服器,建立訊息佇列以便於儲存訊息資料。實現架構如圖9所示。

圖10 利用MSMQ實現的分散式處理架構

首先,分銷商的資料通過MSMQ傳遞到MSMQ Server,再將資料插入到SQL Server資料庫的同時,利用FTP將資料傳送到專門的檔案伺服器上。EBS App Server會將檔案伺服器中的檔案,基於介面規範寫入到Oracle資料庫,從而實現.NET系統與Oracle系統之間的整合。

分散式系統通常能夠緩解單個伺服器的壓力,通過將不同的業務操作與資料處理以不同的服務形式部署並執行在不同的伺服器上,就可以有效地分配與利用伺服器資源。在這種情況下,部署在不同伺服器上的服務,既可能作為服務端,用以處理客戶端呼叫的請求,也可能作為客戶端,在處理完自己的業務後,將其餘業務請求委派給其他服務。在早期的CORBA系統中,通過建立統一的Naming Service,用以管理和分派服務,並通過Event Service實現事件的分發與處理。但CORBA系統採用的是RPC的方式,需要將服務設計和部署為遠端物件,並建立代理。如果通過訊息通道的方式,則既可以解除這種對遠端物件的依賴,又可以很好地支援非同步呼叫模型。在前面提到的CIMS系統,就是通過訊息匯流排提供訊息傳遞的基礎設施,並建立統一的訊息處理服務模型,解除服務見的依賴,使得各個服務能夠獨立地部署到不同伺服器上。

4、面臨的困難

由於訊息模式自身的特殊性,我們在運用訊息模式建立基於訊息的分散式架構時,常常會面臨許多困難。

首先是系統整合的問題。由於系統之間的通訊靠訊息進行傳遞,就必須保證訊息的一致性,同時,還需要維護系統之間(主要是服務之間)介面的穩定性。一旦介面發生變化,就可能影響到該介面的所有呼叫者。即使服務通過介面進行了抽象,由於訊息持有雙方服務規定的業務資料,在一定程度上違背了封裝的要義。換言之,生產與消費訊息的雙方都緊耦合於訊息。訊息的變化會直接影響到各個服務介面的實現類。然而,為了儘可能保證介面的抽象性,我們所要處理的訊息都不是強型別的,這就使得我們在編譯期間很難發現因為訊息內容發生變更產生的錯誤。在我之前提到的汽車零售商管理系統就存在這樣的問題。當時我負責的CRM模組需要同時與多個子系統進行通訊,而每個子系統又是由不同的團隊進行開發。團隊之間因為溝通原因,常常未能及時地同步介面表。雖然各個子系統的單元測試和功能測試都已通過,但直到對CRM進行整合測試,才發現存在大量訊息不匹配的整合問題,這些問題的起因都是因為訊息的變更。

解決的方案是引入充分的整合測試,甚至是迴歸測試,並需要及時執行這些測試,以快速地獲得反饋。我們可以將整合測試作為提交程式碼的驗證們,要求每次提交程式碼都必須執行整合測試與指定的迴歸測試 。這正是持續整合的體現。通過在本地構建與遠端構建執行整合測試與迴歸測試,有效地保證本地版本與整合後的版本不會因為訊息的改變使得功能遭受破壞。一旦遭受破壞,也能夠及時獲得反饋,發現問題,即刻解決這些問題,而不是等到專案後期集中進行整合測試。

另一個問題是後臺任務的非實時性帶來的測試困難。由於後臺任務是定期對訊息佇列中的訊息進行處理,因而觸發的時機是不可預測的 。對於這種情況,我們通常會同時運用兩種方案,雙管其下地解決問題。首先,我們會為系統引入一個同步實現功能的版本,並通過在配置檔案中引入toggle的開關機制,隨時可以在同步功能與非同步功能之間進行切換。如果我們能夠保證訊息佇列處理與後臺任務執行的正確性,就可以設定為同步功能,這樣就能快速而準確地對該任務所代表的功能進行測試,並及時收穫反饋。同時,我們可以在持續整合伺服器上建立一個專門的管道(pipeline),用以執行基於訊息處理的非同步版本。這個管道對應的任務可以通過手動執行,也可以對管道設定定時器,在指定時間執行(例如在凌晨兩點執行一次,這樣在第二天開始工作之前可以獲得反饋)。我們需要為該管道準備特定的執行環境,並將後臺任務的偵聽與執行時間修改為可以接受的值。這樣既能夠及時瞭解功能是否正確,又能保證基於訊息的系統是工作正常的。

當然,分散式系統還存在解析訊息、網路傳遞的效能損耗。對於這些問題,需要架構師審慎地分析業務場景,正確地選擇架構方案與架構模式。相比較本地系統而言,分散式系統的維護難度可能成倍遞增。這既需要我們在進行架構決策與設計時,充分考慮系統架構的穩定性,同時還需要引入系統日誌處理。更好的做法是為日誌處理增加錯誤通知的功能,只要發生訊息處理的錯誤資訊,就通過郵件、簡訊等方式通知系統管理員,及時地處理錯誤。因為只有在發生錯誤的當時查詢錯誤日誌,才能夠更好對問題進行定位。同時,還可以為系統引入Error Message Queue以及Dead Message Queue,以便於處理錯誤和異常情況。

對於分散式系統而言,還需要考慮服務執行結果的一致性,尤其是當某個業務需要多個服務參與到一個會話中時,一旦某個服務發生故障,就可能導致應用出現狀態不一致的情況,因為只有所有參與者都成功執行了任務,才能視為完全成功。這就牽涉到分散式事務的問題,此時任務的執行就變成了事務型的:即任務必須是原子的,結果狀態必須保持一致。在任務處理過程中,狀態修改是彼此隔離的,成功的狀態修改在整個事務執行過程中是持久的。這就是事務的ACID(Atomic,Consistent,Isolated與Durable)屬性。

一種方案是引入分散式事務協調器,即DTC(Distributed Transaction Coordinator),將事務分為兩段式甚至三段式提交,要求整個事務的所有參與者以投票形式決定事務是完全成功還是失敗。另一種方案是降低對結果一致性的要求。根據eBay的最佳實踐,考慮到分散式事務的成本,獲得分散式資源即時的一致性是不必要的,也是不現實的。在Randy Shoup的文章《可伸縮性最佳實踐:來自eBay的經驗》中提到了Eric Brewer的CAP公理:分散式系統的三項重要指標——一致性(Consistency)、可用性(Availability)和 分割槽耐受性(Partition-tolerance)——在任意時刻,只有兩項能同時成立。我們應該根據不同的應用場景,權衡這三個要素。在不必要保證即時的一致性前提下,我們可以考慮合理地劃分服務,儘量將可能作用在同一個事務範圍的業務操作部署在同一個程序中,以避免分散式部署。如果確實需要多個分散式服務之間保持執行結果的一致,可以考慮引入資料核對,非同步恢復事件或集中決算等手段。