1. 程式人生 > >.NET Core微服務之基於EasyNetQ使用RabbitMQ訊息佇列

.NET Core微服務之基於EasyNetQ使用RabbitMQ訊息佇列

一、訊息佇列與RabbitMQ

1.1 訊息佇列

  “訊息”是在兩臺計算機間傳送的資料單位。訊息可以非常簡單,例如只包含文字字串;也可以更復雜,可能包含嵌入物件。訊息被髮送到佇列中,“訊息佇列”是在訊息的傳輸過程中儲存訊息的容器

  訊息佇列(Message Queue),是分散式系統中重要的元件,其通用的使用場景可以簡單地描述為:

當不需要立即獲得結果,但是併發量又需要進行控制的時候,差不多就是需要使用訊息佇列的時候。  

  訊息佇列主要解決了應用耦合、非同步處理、流量削鋒等問題。當前使用較多的訊息佇列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分資料庫如Redis、Mysql以及phxsql也可實現訊息佇列的功能。更多詳細內容請參考:《

訊息佇列及其應用場景介紹

  我也在前幾年寫過一篇基於Redis做訊息佇列的文章,對訊息佇列的一個應用場景做了介紹,沒有了解過的童鞋可以看看。

1.2 RabbitMQ

  

  RabbitMQ是一款基於AMQP(高階訊息佇列協議),由Erlang開發的開源訊息佇列元件。是一款優秀的訊息佇列元件,他由兩部分組成:服務端和客戶端,客戶端支援多種語言的驅動,如:.Net、JAVA、Erlang等。

  網上有很多效能比較的文章,例如在1百萬條1k的訊息下,每秒種的收發情況如下圖所示:

  效能比較

  這裡不過多介紹RabbitMQ,有關RabbitMQ的一些需要了解的概念你可以通過下面的文章瞭解:

  如果你想了解RabbitMQ與Kafka的對比,可以閱讀這篇文章:《開源軟體成熟度評測報告-分散式訊息中介軟體

  而EasyNetQ呢,它是一款基於RabbitMQ.Client封裝的API庫,正如其名,使用起來比較Easy,它把原RabbitMQ.Client中的很多操作都進行了再次封裝,讓開發人員減少了很多工作量。

二、RabbitMQ的安裝

2.1 Linux下的安裝

  這裡不演示如何在Linux下安裝,但推薦生產環境使用Linux,下面是一些參考資料:

2.2 Windows下的安裝

  開發環境下,我一般使用Windows Server虛擬機器,所以這裡說明下如何在Windows下安裝:

  (1)下載ErlangRabbitMQ (這裡我選則的並非最新版本,而是etp20.3和rabbitmq3.7.5)

  

  (2)首先安裝Erlang,然後新增環境變數(如果添加了,則skip這一步)並加到PATH中

  

  (3)其次安裝RabbitMQ,一路Next,安裝完成後也為其新增環境變數並新增到PATH中

  

  

  (4)檢查是否安裝成功:rabbitmqctl status

  這裡我碰到了如下的錯誤:

  

  解決方法:

  最終狀態:

  

  檢查Windows服務,發現已經自動註冊了一個服務:

  

  (5)啟用Web管理外掛,然後檢查是否可見(http://127.0.0.1:15672)

  

  

2.3 一些必要的配置

  (1)使用預設賬號:guest/guest登入進去,新增一個新使用者(Administrator許可權),並設定其Permission

  

  (2)新增新的虛擬機器(預設為/,這裡我新增一個名為EDCVHOST的虛擬機器)

  

  (3)繫結新新增的使用者到新的虛擬機器上,接下來在我們的程式中就主要使用admin這個使用者和EDCVHOST這個虛擬機器

  

  *.當然,為了安全考慮,你也可以把guest使用者remove掉

三、Quick Start:第一個訊息佇列

3.1 專案準備

  這裡為了快速的演示如何使用EasyNetQ,我們來一個QuickStart,準備三個專案:兩個Console程式和一個Class Library。

  

  其中,對Publisher和Subscriber專案安裝EasyNetQ:

NuGet>Install-Package EasyNetQ  

  針對Messages類庫,新增一個class如下:

    public class TextMessage
    {
        public string Text { get; set; }
    }

3.2 我是Publisher

  新增以下程式碼:

    public class Program
    {
        public static void Main(string[] args)
        {
            var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";

            using (var bus = RabbitHutch.CreateBus(connStr))
            {
                var input = "";
                Console.WriteLine("Please enter a message. 'Quit' to quit.");
                while ((input = Console.ReadLine()) != "Quit")
                {
                    bus.Publish(new TextMessage
                    {
                        Text = input
                    });
                }
            }
        }
    }

  可以看到,我們在其中使用EasyNetQ高度封裝的介面建立了一個IBus介面的例項,通過這個IBus例項我們可以通過一個超級Easy的Publish介面進行釋出訊息。這裡主要是讀取使用者在控制檯中輸入的訊息字串進行傳送。實際中,傳送的一般都是一個或多個複雜的實體物件。

3.3 我是Subscriber

  新增如下所示程式碼:

    public class Program
    {
        public static void Main(string[] args)
        {
            var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";

            using (var bus = RabbitHutch.CreateBus(connStr))
            {
                bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage);

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }

        public static void HandleTextMessage(TextMessage textMessage)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("Got message: {0}", textMessage.Text);
            Console.ResetColor();
        }
    }

  這裡主要是通過IBus例項去訂閱訊息(這裡是除非使用者關閉程式否則一直處於監聽狀態),當釋出者釋出了指定型別的訊息之後,這裡就把它打印出來(紅色字型顯示)。

3.4 簡單測試 

  通過控制檯資訊檢視結果:

  

  通過RabbitMQ管理介面檢視:

  (1)通過Connections Tab可以發現我們的兩個客戶端都在Running中

  

  (2)通過Queues Tab檢視目前已有的佇列=>可以看到目前我們只註冊了一個佇列

  

四、在ASP.NET Core中的使用

4.1 案例結構與說明

  這裡假設有這樣一個場景,客戶通過瀏覽器提交了一個保單,這個保單中包含一些客戶資訊,ClientService將這些資訊處理後傳送一個訊息到RabbitMQ中,NoticeService和ZAPEngineService訂閱了這個訊息。NoticeService會將客戶資訊取出來並獲取一些更多資訊為客戶傳送Email,而ZAPEngineService則會根據客戶的一些關鍵資訊(比如:年齡,是否吸菸,學歷,年收入等等)去資料庫讀取一些規則來生成一份Question List並存入資料庫。

4.2 專案準備工作

  建立上面提到的這幾個專案,這裡我選擇ASP.NET Core WebAPI型別。

  分別為這幾個專案通過NuGet安裝EasyNetQ元件,並且通過以下程式碼注入統一的IBus例項物件:

    public IServiceProvider ConfigureServices(IServiceCollection services)
    {
      // IoC - EventBus
      services.AddSingleton(RabbitHutch.CreateBus(Configuration["MQ:Dev"]));
      ......
    }

  這裡我將連線字串寫到了配置檔案中,請參考上面的QuickStart中的內容。

  下面是這個demo用到的一個訊息物件實體:通過標籤宣告佇列名稱。

    [Queue("Qka.Client", ExchangeName = "Qka.Client")]
    public class ClientMessage
    {
        public int ClientId { get; set; }
        public string ClientName { get; set; }
        public string Sex { get; set; }
        public int Age { get; set; }
        // N: Non-Smoker, S: Smoker
        public string SmokerCode { get; set; }
        // Bachelor, Master, Doctor
        public string Education { get; set; }
        public decimal YearIncome { get; set; }
    }

  這裡為了快速的在專案中使用Subscriber,新增一個擴充套件方法,它會從注入的服務中取出IBus例項物件,並自動幫我們進行Subscriber(那些實現了IConsume介面的類)的註冊。具體用法見後面的介紹。

    public static class AppBuilderExtension
    {
        public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
        {
            var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;

            var lifeTime = services.GetService<IApplicationLifetime>();
            var bus = services.GetService<IBus>();
            lifeTime.ApplicationStarted.Register(() =>
            {
                var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                subscriber.Subscribe(assembly);
                subscriber.SubscribeAsync(assembly);
            });

            lifeTime.ApplicationStopped.Register(() => bus.Dispose());

            return appBuilder;
        }
    }

4.3 Publisher:ClientService

  ClientService作為釋出者,這裡假設我們在API中處理完業務程式碼後,將message釋出給RabbitMQ:

    [Produces("application/json")]
    [Route("api/Client")]
    public class ClientController : Controller
    {
        private readonly IClientService clientService;
        private readonly IBus bus;

        public ClientController(IClientService _clientService, IBus _bus)
        {
            clientService = _clientService;
            bus = _bus;
        }

        ......

        [HttpPost]
        public async Task<string> Post([FromBody]ClientDTO clientDto)
        {
            // Business Logic here...
            // eg.Add new client to your service databases via EF
            // Sample Publish
            ClientMessage message = new ClientMessage
            {
                ClientId = clientDto.Id.Value,
                ClientName = clientDto.Name,
                Sex = clientDto.Sex,
                Age = 29,
                SmokerCode = "N",
                Education = "Master",
                YearIncome = 100000
            };
            await bus.PublishAsync(message);

            return "Add Client Success! You will receive some letter later.";
        }
    }

  當然,你可以使用同步方法:bus.Publish(message);

4.4 Subscriber: NoticeService & ZAPEngineService

  (1)NoticeService:新增一個實現IConsume介面的Consumer類

    public class ClientMessageConsumer: IConsumeAsync<ClientMessage>
    {
        [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.Notice")]
        public Task ConsumeAsync(ClientMessage message)
        {
            // Your business logic code here
            // eg.Build one email to client via SMTP service
            // Sample console code
            System.Console.ForegroundColor = System.ConsoleColor.Red;
            System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will send one email to client.", message.ClientName);
            System.Console.ResetColor();

            return Task.CompletedTask;
        }
    }

  這裡為了演示效果,增加了一些輸出資訊的程式碼,下面的ZAPEngineService也是一樣,不再贅述。

  (2)ZAPEngineService:新增一個實現IConsume介面的Consumer類

    public class ClientMessageConsumer : IConsumeAsync<ClientMessage>
    {
        [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")]
        public Task ConsumeAsync(ClientMessage message)
        {
            // Your business logic code here
            // eg.Generate one ZAP question records into database and send to client
            // Sample console code
            System.Console.ForegroundColor = System.ConsoleColor.Red;
            System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName);
            System.Console.ResetColor();

            return Task.CompletedTask;
        }
    }

  注意兩個Consumer的SubscriptionId不能一樣,否則無法接受到訊息。

  (3)為兩個Consumer使用擴充套件方法:UseSubscribe

    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        // easyNetQ
        app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly());
    }

4.5 簡單測試

  (1)藉助Postman向ClientService發起Post請求

  (2)檢視NoticeService的日誌資訊

  (3)檢視ZAPEngineService的日誌資訊

  (4)檢視RabbitMQ的管理控制檯:

五、小結

  本篇超級簡單地介紹了一下訊息佇列與RabbitMQ,通過使用EasyNetQ這個基於RabbitMQ.Client的客戶端做了一個QuickStart演示了在.NET Core環境下如何進行訊息的釋出與訂閱,並通過一個微服務的小案例演示瞭如何在ASP.NET Core環境下如何基於EasyNetQ完成訊息的釋出與訂閱,看起來就像一個類似於簡單的事件匯流排。當然,本篇的內容都十分基礎,如果要應用好RabbitMQ,還得把那些基礎概念(如:Channel,Exchange等)弄清楚,然後去理解一下事件匯流排的概念,實際中還得考慮資料一致性等等,路途漫漫,繼續加油吧!

示例程式碼

  Click Here => 點我下載

參考資料

作者:周旭龍

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連結。

相關推薦

.NET Core服務基於EasyNetQ使用RabbitMQ訊息佇列

一、訊息佇列與RabbitMQ 1.1 訊息佇列   “訊息”是在兩臺計算機間傳送的資料單位。訊息可以非常簡單,例如只包含文字字串;也可以更復雜,可能包含嵌入物件。訊息被髮送到佇列中,“訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。   訊息佇列(Message Queue),是分散式系統中重要

.NET Core服務基於Consul實現服務治理

請求轉發 1.0 asp.net AC port prefix 我們 tle nan 一、Consul基礎介紹   Consul是HashiCorp公司推出的開源工具,用於實現分布式系統的服務發現與配置。與其他分布式服務註冊與發現的方案,比如 Airbnb的Smart

.NET Core服務基於Consul實現服務治理(續)

shell pla code tst 分層 編輯 set req \n 上一篇發布之後,這一篇把上一篇沒有弄到的東西補一下,也算是給各位前來詢問的朋友的一些回復吧。一、Consul服務註冊之配置文件方式1.1 重溫Consul實驗集群  這裏我們有三個Consul Serv

基於Apollo實現.NET Core服務統一配置(測試環境-單機) .NET Core服務基於Apollo實現統一配置中心

一、前言 注:此篇只是為測試環境下的快速入門。後續會給大家帶來生產環境下得實戰開發。 具體的大家可以去看官方推薦。非常的簡單明瞭。以下介紹引用官方內容: Apollo(阿波羅)是攜程框架部門研發的分散式配置中心,能夠集中化管理應用不同環境、不同叢集的配置,配置修改後能夠實時推送到應用端,並且具

.NET Core服務基於Steeltoe使用Eureka實現服務註冊與發現

一、關於Steeltoe與Spring Cloud    Steeltoe is an open source project that enables .NET developers to implement industry standard best practices when b

.NET Core服務基於Steeltoe整合Zuul實現統一API閘道器

一、關於Spring Cloud Zuul   API Gateway(API GW / API 閘道器),顧名思義,是出現在系統邊界上的一個面向API的、序列集中式的強管控服務,這裡的邊界是企業IT系統的邊界。   Zuul 是Netflix 提供的一個開源元件,致力於在雲平臺上提供動態路由,監

.NET Core服務基於Steeltoe使用Hystrix熔斷保護與監控

一、關於Spring Cloud Hystrix      在微服務架構中,我們將系統拆分為很多個服務,各個服務之間通過註冊與訂閱的方式相互依賴,由於各個服務都是在各自的程序中執行,就有可能由於網路原因或者服務自身的問題導致呼叫故障或延遲,隨著服務的積壓,可能會導致服務崩潰。為了解決這一系列的問題

.NET Core服務基於Steeltoe使用Spring Cloud Config統一管理配置

一、關於Spring Cloud Config   在分散式系統中,每一個功能模組都能拆分成一個獨立的服務,一次請求的完成,可能會呼叫很多個服務協調來完成,為了方便服務配置檔案統一管理,更易於部署、維護,所以就需要分散式配置中心元件了,在Spring Cloud中,就有這麼一個分散式配置中心元件 —

.NET Core服務基於Steeltoe使用Zipkin實現分散式追蹤

一、關於Spring Cloud Sleuth與Zipkin   在 SpringCloud 之中提供的 Sleuth 技術可以實現微服務的呼叫跟蹤,也就是說它可以自動的形成一個呼叫連線線,通過這個連線線使得開發者可以輕鬆的找到所有微服務間關係,同時也可以獲取微服務所耗費的時間, 這樣就可以進行微服

.NET Core服務基於Jenkins+Docker實現持續部署(Part 1)

一、CI, CD 與Jenkins   網際網路軟體的開發和釋出,已經形成了一套標準流程,最重要的組成部分就是持續整合(Continuous integration,簡稱 CI) => 持續整合指的是,頻繁地(一天多次)將程式碼整合到主幹。   它的好處主要有兩個: 快速發現錯

.NET Core服務基於App.Metrics+InfluxDB+Grafana實現統一效能監控

一、關於App.Metrics+InfluxDB+Grafana 1.1 App.Metrics      App.Metrics是一款開源的支援.NET Core的監控外掛,它還可以支援跑在.NET Framework上的應用程式(版本 >= 4.5.2)。官方文件地址:https://ww

.NET Core服務基於Apollo實現統一配置中心

一、關於統一配置中心與Apollo   在微服務架構環境中,專案中配置檔案比較繁雜,而且不同環境的不同配置修改相對頻繁,每次釋出都需要對應修改配置,如果配置出現錯誤,需要重新打包釋出,時間成本較高,因此需要做統一的配置中心,能做到自動更新配置檔案資訊,解決以上問題。   Apollo(阿波羅)是攜

.NET Core服務基於Ocelot實現API閘道器服務

一、啥是API閘道器?   API 閘道器一般放到微服務的最前端,並且要讓API 閘道器變成由應用所發起的每個請求的入口。這樣就可以明顯的簡化客戶端實現和微服務應用程式之間的溝通方式。以前的話,客戶端不得不去請求微服務A(假設為Customers),然後再到微服務B(假設為Orders),然後是微服

.NET Core服務基於Exceptionless實現分散式日誌記錄

一、Exceptionless極簡介紹   Exceptionless 是一個開源的實時的日誌收集框架,它可以應用在基於 ASP.NET,ASP.NET Core,Web API,Web Forms,WPF,Console,ASP.NET MVC 等技術開發的應用程式中,並且提供了REST介面可以應

.NET Core服務基於MassTransit實現資料最終一致性(Part 2)

一、案例結構與說明   在上一篇中,我們瞭解了MassTransit這個開源元件的基本用法,這一篇我們結合一個小案例來了解在ASP.NET Core中如何藉助MassTransit+Quartz.Net來實現資料的最終一致性。當然,實現資料的最終一致性有很多方案,這裡只是舉一種我所學到的比較簡單易於學習

.NET Core服務基於MassTransit實現資料最終一致性(Part 1)

一、預備知識:資料一致性   關於資料一致性的文章,園子裡已經有很多了,如果你還不瞭解,那麼可以通過以下的幾篇文章去快速地瞭解瞭解,有個感性認識即可。   必須要了解的點:ACID、CAP、BASE、強一致性、弱一致性、最終一致性。      CAP理論由加州大學伯克利分校的計算機

.NET Core服務基於Polly+AspectCore實現熔斷與降級機制

一、熔斷、降級與AOP 1.1 啥是熔斷?   在廣義的解釋中,熔斷主要是指為控制股票、期貨或其他金融衍生產品的交易風險,為其單日價格波動幅度規定區間限制,一旦成交價觸及區間上下限,交易則自動中斷一段時間(“熔即斷”),或就此“躺平”而不得超過上限或下限(“熔而不斷”)。   而對於微服務來說,

.NET Core服務基於Ocelot+Butterfly實現分散式追蹤

一、什麼是Tracing?   微服務的特點決定了功能模組的部署是分散式的,以往在單應用環境下,所有的業務都在同一個伺服器上,如果伺服器出現錯誤和異常,我們只要盯住一個點,就可以快速定位和處理問題,但是在微服務的架構下,大部分功能模組都是單獨部署執行的,彼此通過匯流排互動,都是無狀態的服務,這種架構下,

.NET Core服務基於IdentityServer建立授權與驗證服務(續)

上一篇我們基於IdentityServer4建立了一個AuthorizationServer,並且繼承了QuickStartUI,能夠成功獲取Token了。這一篇我們瞭解下如何整合API Service和MVC Web Application。 一、整合API Service 1.1 新增ASP.NE

.NET Core服務基於IdentityServer建立授權與驗證服務

一、IdentityServer的預備知識   要學習IdentityServer,事先得了解一下基於Token的驗證體系,這是一個龐大的主題,涉及到Token,OAuth&OpenID,JWT,協議規範等等等等,園子裡已經有很多介紹的文章了,個人覺得solenovex的這一篇文章《學習Id