1. 程式人生 > >分散式訊息匯流排,基於.NET Socket Tcp的釋出-訂閱框架之離線支援,附程式碼下載

分散式訊息匯流排,基於.NET Socket Tcp的釋出-訂閱框架之離線支援,附程式碼下載

一、分散式訊息匯流排以及基於Socket的實現

     在前面的分享一個分散式訊息匯流排,基於.NET Socket Tcp的釋出-訂閱框架,附程式碼下載一文之中給大家分享和介紹了一個極其簡單也非常容易上的基於.NET Socket Tcp 技術實現的分佈訊息匯流排,也是一個簡單的釋出訂閱框架:

image

    並且以案例的形式為大家演示瞭如何使用這個分散式訊息匯流排架構釋出訂閱架構模式的應用程式,在得到各位同仁的反饋的同時,大家也非常想了解訂閱者離線的情況,即支援離線構釋出訂閱框架。

二、離線架構

     不同於訂閱者、釋出者都同時線上的情況,支援訂閱者離線,架構將有所變化,如下圖所示:

image

     也會比原先的結構將更加複雜,其中需要處理以下兩個關鍵點:

     1)訂閱者的持久化儲存。

     2)訂閱者離線之後其所訂閱訊息的持久儲存。

三、解決方案

     為解決訊息匯流排的離線支援機制,我們在Socket 框架之中增加了一個介面ISubscribeStorager

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5:  
   6: namespace
EAS.Messages
   7: {   
   8:     /// <summary>
   9:     /// 訊息訂閱儲存介面。
  10:     /// </summary>
  11:     public interface ISubscribeStorager
  12:     {
  13:         /// <summary>
  14:         /// 持久化訂閱。
  15:         /// </summary>
  16:         /// <param name="subscriber">訂閱者。</param>
  17:         /// <param name="topic">訊息主題。</param>
  18:         void Subscribe(string subscriber, string topic);
  19:  
  20:         /// <summary>
  21:         /// 持久化退訂。
  22:         /// </summary>
  23:         /// <param name="subscriber">訂閱者。</param>
  24:         /// <param name="topic">訊息主題。</param>
  25:         void Unsubscribe(string subscriber, string topic);
  26:  
  27:         /// <summary>
  28:         /// 裝載訂閱資訊。
  29:         /// </summary>
  30:         /// <returns>系統之中的訂閱清單。</returns>
  31:         List<SubscribeItem> LoadSubscribes();
  32:  
  33:         /// <summary>
  34:         /// 寫入訊息。
  35:         /// </summary>
  36:         /// <param name="subscriber">訂閱者。</param>
  37:         /// <param name="message">訊息物件。</param>
  38:         void Write(string subscriber, QueueMessage message);
  39:  
  40:         /// <summary>
  41:         /// 讀訊息。
  42:         /// </summary>
  43:         /// <param name="subscriber">訂閱者。</param>
  44:         /// <param name="message">訊息物件。</param>
  45:         /// <returns>成功讀取返回true,否則返回false。</returns>
  46:         bool Read(string subscriber, out QueueMessage message);
  47:     }
  48: }

     ISubscribeStorager共提供持久化訂閱持久化訊息儲存共五個函式,其中:

     LoadSubscribes:服務端初始化時讀取所有的離線訂閱關係,即那個訂閱都訂閱那那個主題。

     Subscribe:持久化訂閱者,當訂閱才上線訂閱訊息時,持久化訂閱關係,供離線檢測之用。

     Unsubscribe:持久化取消訂閱,當訂閱者退訂訊息時,從持久化訂閱關係之中刪除。

     Write:當訂閱者離線時,把訂閱訊息寫入持久化儲存。

     Read:當離線訂閱者上線時,從持久儲存之中讀取一條訊息向其傳送。

     ISubscribeStorager:可以選擇自己實現這個介面,以建立滿足自己規則的離線儲存機制,當然在AgileEAS.NET SOA 中介軟體之中提供了兩種離線儲存機制,儲存於資料庫和儲存於MSMQ,下面向大家介紹一下這兩種內建實現。

四、兩種內建離線儲存機制

     在AgileEAS.NET SOA 中介軟體平臺之中提供了兩個ISubscribeStorager的實現,基於資料庫的離線訂閱儲存實現EAS.Messages.DbSubscribeStorager和基於MSMQ的離線訂閱儲存實現EAS.Messages.MsmqSubscribeStorager

     EAS.Messages.DbSubscribeStorager儲存訂閱關係在messageSubscribe.Config檔案之中,訊息儲存在關係資料庫SOA_SUBSCRIBEEVENTS表之中,使用前必須要建立相應的表結構,以下是SQL Server的DDL指令碼:

   1: CREATE TABLE [SOA_SUBSCRIBEEVENT](
   2:     [GUID] [varchar](36) NOT NULL,
   3:     [SUBSCRIBER] [nvarchar](128) NOT NULL,
   4:     [TOPIC] [nvarchar](128) NOT NULL,
   5:     [BODY] [image] NULL,
   6:     [FCTIME] [datetime] NOT NULL,
   7:  CONSTRAINT [PK_SOA_SUBSCRIBEEVENT] PRIMARY KEY CLUSTERED 
   8: (
   9:     [GUID] ASC
  10: )
  11: ) 

      目前理論上支援SQLServer 、Mysql、ORACLE、Sqlite四種資料庫結構,具體建表指令碼請自行參考相應資料書寫,也可以使用AgileEAS.NET SOA中介軟體所提供的資料庫初始化工具建立。

      EAS.Messages.MsmqSubscribeStorager儲存訂閱關係在messageSubscribe.Config檔案之中,訊息儲存Msmq訊息佇列之中,使用之前請確保機器上安裝了MSMQ訊息對列。

五、關於自定義實現ISubscribeStorager

     有興趣的朋友可以自定義實現介面ISubscribeStorager,這樣就可以按自己的規則進行儲存,比如把離線訊息儲存到mongodb、Redis、或者直接儲存在檔案之中,或者其他更多的實現規則,在此就不一一介紹,如有相關興趣,請聯絡作者,如確有必要需要給在家介紹一下如何實現,將會另開一文字介紹如何自定義實現ISubscribeStorager介面。

六、改進線上例子支援離線

     還是跟上次一樣,以案例為在家展示一下怎麼進行離線訊息,就不重新開始例子,對原有例子做一些改進,改進後例子如下:

image

     其中在原有專案的基礎上增加了:Demo.Subscriber1和Demo.Subscriber2專案,其專案配置程式碼、配置檔案基本上同Demo.Subscriber一樣,其中唯一的差別在於,Demo.Subscriber1和Demo.Subscriber2向伺服器提交訂閱的時候都增加一個另friendName引數,其使用IMessageBus介面的以下訂閱函式:

   1: /// <summary>
   2: /// 訂閱訊息。
   3: /// </summary>
   4: /// <param name="subscriber">訂閱者。</param>
   5: /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
   6: /// <param name="topic">主題。</param>
   7: /// <param name="notifyHandler">訂閱通知。</param>
   8: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);

                Demo.Publisher專案為釋出者程式碼。

                Demo.Subscriber專案為訂閱者程式碼。

                Demo.Server專案為服務端程式碼。

     Demo.Subscriber1專案之中,其Program.cs程式碼如下:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Windows.Forms;
   5: using EAS.Messages;
   6:  
   7: namespace Demo.Subscriber1
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
  14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
  15:             System.Console.WriteLine("Subscriber1");
  16:  
  17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
  18:             System.Console.ReadLine();
  19:         }
  20:  
  21:         static void MessageNotify(object m)
  22:         {
  23:             Demo.Messages.Message message = m as Demo.Messages.Message;
  24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
  25:         }
  26:     }
  27: }

     其中bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);在訂閱訊息的時候給了一個friendName為Subscriber1,Demo.Subscriber2與Demo.Subscriber1專案的唯一的差別就是此處為Subscriber2.

     我們使用內建的EAS.Messages.DbSubscribeStorager,則不需要修改服務端的程式碼,只需要修改服務端的配置檔案如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--資料庫連線-->
  12:       <object name="DbProvider" assembly="EAS.Data" type="EAS.Data.Access.SqlClientDbProvider" LifestyleType="Thread">
  13:         <property name="ConnectionString" type="string" value="Data Source=.;Initial Catalog=eas_db;Integrated Security=SSPI;Connect Timeout=30" />
  14:       </object>
  15:       <!--資料訪問器-->
  16:       <object name="DataAccessor" assembly="EAS.Data" type="EAS.Data.Access.DataAccessor" LifestyleType="Thread">
  17:         <property name="DbProvider" type="object" value="DbProvider"/>
  18:         <property name="Language" type="object" value="TSqlLanguage"/>
  19:       </object>
  20:       <!--ORM訪問器-->
  21:       <object name="OrmAccessor" assembly="EAS.Data" type="EAS.Data.ORM.OrmAccessor" LifestyleType="Thread">
  22:         <property name="DataAccessor" type="object" value="DataAccessor"/>
  23:       </object>
  24:       <!--查詢語言-->
  25:       <object name="TSqlLanguage" assembly="EAS.Data" type="EAS.Data.Linq.TSqlLanguage" LifestyleType="Thread"/>
  26:       <!--訊息持久化-->
  27:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.DbSubscribeStorager" LifestyleType="Singleton"/>
  28:       <!--日誌管理-->
  29:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  30:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  31:       </object>
  32:     </objects>
  33:   </eas>
  34: </configuration>

     在配置檔案的IOC配置之中我們配置了訊息儲存物件以及其所依賴的資料庫訪問物件、Linq查詢語言表示式,另外需要說明的是,我們需要把配置檔案之中所涉及的EAS.Data.dll、EAS.SOA.BootStrap.dll複製到編譯輸出Publish,這兩個檔案可以從AgileEAS.NET SOA 中介軟體平臺釋出包之中尋找,本案例的下載壓碎包之中會包括這兩個檔案。

     有關於基於Msmq的配置,只需要修改配置檔案如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--訊息持久化-->
  12:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.MsmqSubscribeStorager" LifestyleType="Singleton"/>
  13:       <!--日誌管理-->
  14:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  15:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  16:       </object>
  17:     </objects>
  18:   </eas>
  19: </configuration>

     到此為止,所有程式碼均已完成,是不是很簡單,接下來,我們跑起來驗證一下效果。

七、驗證效果

     我們在編譯輸入目錄Publish下先啟動Demo.Server.exe,再各啟動Demo.Subscriber.exe、Demo.Subscriber1.exe、Demo.Subscriber2.exe,再啟動一個Demo.Publisher.exe,在Demo.Publisher.exe控制檯按回車鍵:

N_[]_C%GTD_$0[KL}}B~E$A

目前程式三個訂閱者都是線上的,Demo.Publisher釋出了三條訊息,三個訂閱者都收到了三條訊息,那麼我們關閉Demo.Subscriber2之後再由Demo.Publisher釋出兩條訊息:

R(OM90FW6B0WYO6R)0MQ7D4

然後我們再啟動Demo.Subscriber2,看是否還能收到其離線之後由Demo.Publisher釋出的兩條訊息:

ZE8XA`V4PE)5%F~QF}NO]0N

OK,到此為止。

八、原始碼下載

九、問題反饋

     麻煩大家在通過視訊進行學習的時候能及時把問題反饋給樓主,或者有什麼需要改進的一些建議都請向樓主直接反饋,以下是聯絡方式:

樓主QQ:47920381,AgileEAS.NET

QQ群:113723486(AgileEAS SOA 平臺)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平臺交流)/上限1000人

郵件:[email protected],[email protected],

電話:18629261335。