.NET Core微服務之基於MassTransit實現資料最終一致性(Part 1)
一、預備知識:資料一致性
關於資料一致性的文章,園子裡已經有很多了,如果你還不瞭解,那麼可以通過以下的幾篇文章去快速地瞭解瞭解,有個感性認識即可。
必須要了解的點:ACID、CAP、BASE、強一致性、弱一致性、最終一致性。
CAP理論由加州大學伯克利分校的計算機教授Eric Brewer在2000年提出,其核心思想是任何基於網路的資料共享系統最多隻能滿足資料一致性(Consistency)、可用性(Availability)和網路分割槽容忍(Partition Tolerance)三個特性中的兩個(由此我們知道在分散式系統中,同時滿足CAP三個特性是不可能的),三個特性的定義如下:
C:資料一致性(Consistency):如果系統對一個寫操作返回成功,那麼之後的讀請求都必須讀到這個新資料;如果返回失敗,那麼所有讀操作都不能讀到這個資料,對呼叫者而言資料具有強一致性(Strong Consistency)(又叫原子性Atomic或線性一致性Linerizable Consistency)
A:服務可用性(Availability):所有讀寫請求在一定時間內得到響應,可終止、不會一直等待
P:分割槽容錯性(Partition-Tolerance):在網路分割槽的情況下,被分隔的節點仍能正常對外服務
- 強一致性:當更新操作完成之後,任何多個後續程序或者執行緒的訪問都會返回最新的更新過的值。這種是對使用者最友好的,就是使用者上一次寫什麼,下一次就保證能讀到什麼。根據 CAP 理論,這種實現需要犧牲可用性。=> 在傳統單體式應用中,大部分都是強一致性的應用,想想我們寫過多少工作單元模式的Code?
- 弱一致性:系統並不保證續程序或者執行緒的訪問都會返回最新的更新過的值。系統在資料寫入成功之後,不承諾立即可以讀到最新寫入的值,也不會具體的承諾多久之後可以讀到。
- 最終一致性:弱一致性的特定形式。系統保證在沒有後續更新的前提下,系統最終返回上一次更新操作的值。在沒有故障發生的前提下,不一致視窗的時間主要受通訊延遲,系統負載和複製副本的個數影響。
為保證可用性,網際網路分散式架構中經常將強一致性需求轉換成最終一致性的需求,並通過系統執行冪等性的保證,保證資料的最終一致性。
在微服務架構中,各個微服務之間通常會使用事件驅動通訊和釋出訂閱系統實現最終一致性。
更多背景知識,還是得看上面列出的參考文章,這裡不再贅述。
二、MassTransit極簡介紹
MassTransit 是一個自由、開源、輕量級的訊息匯流排, 用於使用. NET 框架建立分散式應用程式。MassTransit 在現有訊息傳輸上提供了一組廣泛的功能, 從而使開發人員能夠友好地使用基於訊息的會話模式非同步連線服務。基於訊息的通訊是實現面向服務的體系結構的可靠和可擴充套件的方式。
類似的國外開源元件還有NServiceBus,沒有用過,據說MassTransit比NServiceBus更加輕量級,並且在開發之初就選用了RabbitMQ作為訊息傳輸元件,當然MassTransit還支援Azure Service Bus。類似的國內開源元件則有園友savorboard(楊曉東)的CAP,這個我會在MassTransit學習結束後去使用使用,CAP在GitHub上已經有了超過1000個Star,是NCC的幾個千星專案之一。另外,張善友大隊長在他的NanoFabric專案中推薦我們使用Rebus和Ray,如下圖所示:
由於時間和精力,以及文件資料的可見性,我在我的POC和這個系列博文的準備中,只會使用到MassTransit和CAP這兩個開源專案。
三、MassTransit Quick Start
這裡以MassTransit + RabbitMQ為例子,首先請確保安裝了RabbitMQ,如果沒有安裝,可以閱讀我的《基於EasyNetQ使用RabbitMQ訊息佇列》去把RabbitMQ先安裝到你的電腦上。另外,RabbitMQ的背景知識也有一堆,有機會也還是要了解下Exchange,Channel、Queue等內容。
3.1 最簡單的傳送/接收例項
(1)準備兩個控制檯程式,一個為Sender(傳送者),一個為Receiver(接收者),並分別通過NuGet安裝MassTransit以及MassTransit.RabbitMQ
NuGet>Install-Package MassTransit
NuGet>Install-Package MassTransit.RabbitMQ
(2)編寫Sender
public class Program { public static void Main(string[] args) { Console.Title = "MassTransit Client"; var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst => { hst.Username("admin"); hst.Password("edison"); }); }); var uri = new Uri("rabbitmq://192.168.80.71/EDCVHOST/Qka.MassTransitTest"); var message = Console.ReadLine(); while (message != null) { Task.Run(() => SendCommand(bus, uri, message)).Wait(); message = Console.ReadLine(); } Console.ReadKey(); } private static async void SendCommand(IBusControl bus, Uri sendToUri, string message) { var endPoint = await bus.GetSendEndpoint(sendToUri); var command = new Client() { Id = 100001, Name = "Edison Zhou", Birthdate = DateTime.Now.AddYears(-18), Message = message }; await endPoint.Send(command); Console.WriteLine($"You Sended : Id = {command.Id}, Name = {command.Name}, Message = {command.Message}"); } }
這裡首先連線到我的RabbitMQ服務,然後向指定的Queue傳送訊息(這裡是一個Client型別的例項物件)。
(3)編寫Receiver
public class Program { public static void Main(string[] args) { Console.Title = "MassTransit Server"; var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst => { hst.Username("admin"); hst.Password("edison"); }); cfg.ReceiveEndpoint(host, "Qka.MassTransitTest", e=> { e.Consumer<TestConsumerClient>(); e.Consumer<TestConsumerAgent>(); }); }); bus.Start(); Console.WriteLine("Press any key to exit."); Console.ReadKey(); bus.Stop(); } }
對於Receiver,要做的事就只有兩件:一是連線到RabbitMQ,二是告訴RabbitMQ我要接收哪個訊息佇列的什麼型別的訊息。下面是TestConsumerClient和TestConsumerAgent的定義:
public class TestConsumerClient : IConsumer<Client> { public async Task Consume(ConsumeContext<Client> context) { Console.ForegroundColor = ConsoleColor.Red; await Console.Out.WriteLineAsync($"Receive message: {context.Message.Id}, {context.Message.Name}, {context.Message.Birthdate.ToString()}"); Console.ResetColor(); } } public class Client { public int Id { get; set; } public string Name { get; set; } public DateTime Birthdate { get; set; } public string Message { get; set; } } public class TestConsumerAgent : IConsumer<Agent> { public async Task Consume(ConsumeContext<Agent> context) { Console.ForegroundColor = ConsoleColor.Red; await Console.Out.WriteLineAsync($"Receive message: {context.Message.AgentCode}, {context.Message.AgentName}, {context.Message.AgentRole}"); Console.ResetColor(); } } public class Agent { public int AgentCode { get; set; } public string AgentName { get; set; } public string AgentRole { get; set; } public string Message { get; set; } }View Code
(4)測試一下:
3.2 最簡單的釋出/訂閱例項
除了簡單的傳送/接收模式外,我們用的更多的是釋出/訂閱這種模式。
(1)準備下圖所示的類庫和控制檯專案,並對除Messages類庫之外的其他專案安裝MassTransit以及MassTransit.RabbitMQ。
(2)Messages類庫:準備需要傳輸的Message
public class TestBaseMessage { public string Name { get; set; } public DateTime Time { get; set; } public string Message { get; set; } } public class TestCustomMessage { public string Name { get; set; } public DateTime Time { get; set; } public int Age { get; set; } public string Message { get; set; } }View Code
(3)Publisher:接收我的訊息吧騷年們
public class Program { public static void Main(string[] args) { Console.Title = "MassTransit Publisher"; var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST/"), hst => { hst.Username("admin"); hst.Password("edison"); }); }); do { Console.WriteLine("Please enter your message, if want to exit please press q."); string message = Console.ReadLine(); if (message.ToLower().Equals("q")) { break; } bus.Publish(new TestBaseMessage() { Name = "Edison Zhou", Time = DateTime.Now, Message = message }); bus.Publish(new TestCustomMessage() { Name = "Leo Dai", Age = 27, Time = DateTime.Now, Message = message }); } while (true); bus.Stop(); }
這裡向RabbitMQ釋出了兩個不同型別的訊息(TestBaseMessage和TestCustomMessage)
(4)SubscriberA:我只接收TestBaseMessage型別的訊息,其他的我不要
public class Program { public static void Main(string[] args) { Console.Title = "MassTransit SubscriberA"; var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst => { hst.Username("admin"); hst.Password("edison"); }); cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv2.CA", e => { e.Consumer<ConsumerA>(); }); }); bus.Start(); Console.ReadKey(); // press Enter to Stop bus.Stop(); } }
這裡ConsumerA的定義如下:可以看出,它實現了一個泛型介面IConsumer,然後指定了TestBaseMessage為消費的訊息型別。
public class ConsumerA : IConsumer<TestBaseMessage> { public async Task Consume(ConsumeContext<TestBaseMessage> context) { Console.ForegroundColor = ConsoleColor.Red; await Console.Out.WriteLineAsync($"SubscriberA => ConsumerA received message : {context.Message.Name}, {context.Message.Time}, {context.Message.Message}, Type:{context.Message.GetType()}"); Console.ResetColor(); } }View Code
(5)SubscriberA:我只接收TestCustomMessage型別的訊息,其他的我不要
public class Program { public static void Main(string[] args) { Console.Title = "MassTransit SubscriberB"; var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst => { hst.Username("admin"); hst.Password("edison"); }); cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv2.CB", e => { e.Consumer<ConsumerA>(); }); }); bus.Start(); Console.ReadKey(); // press Enter to Stop bus.Stop(); } }
這裡的ConsumerA的定義如下;它實現的介面是IConsumer<TestCustomMessage>
public class ConsumerA : IConsumer<TestCustomMessage> { public async Task Consume(ConsumeContext<TestCustomMessage> context) { Console.ForegroundColor = ConsoleColor.Red; await Console.Out.WriteLineAsync($"SubscriberB => ConsumerA received message : {context.Message.Name}, {context.Message.Time}, {context.Message.Message}, Age: {context.Message.Age}, Type:{context.Message.GetType()}"); Console.ResetColor(); } }View Code
(6)測試一下:由於Publisher傳送了兩個不同型別的訊息,兩個Subscriber均只接受其中的一個型別的訊息。
3.3 帶返回狀態訊息的示例
之前的例子都是釋出之後,不管訂閱者有沒有收到以及收到後有沒有處理成功(即有沒有返回訊息,類似於HTTP請求和響應),在MassTransit中提供了這樣的一種模式,並且還可以結合GreenPipes的一些擴充套件方法實現重試、限流以及熔斷機制。這一部分詳見官方文件:http://masstransit-project.com/MassTransit/usage/request-response.html
(1)準備下圖所示的三個專案:通過NuGet安裝MassTransit以及MassTransit.RabbitMQ
(2)Messages:準備請求和響應的訊息傳輸型別
public interface IRequestMessage { int MessageId { get; set; } string Content { get; set; } } public class RequestMessage : IRequestMessage { public int MessageId { get; set; } public string Content { get; set; } public int RequestId { get; set; } } public interface IResponseMessage { int MessageCode { get; set; } string Content { get; set; } } public class ResponseMessage : IResponseMessage { public int MessageCode { get; set; } public string Content { get; set; } public int RequestId { get; set; } }View Code
(3)Sender 請求傳送端
public class Program { public static void Main(string[] args) { Console.Title = "Masstransit Request Side"; var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst => { hst.Username("admin"); hst.Password("edison"); }); // Retry : 重試 cfg.UseRetry(ret => { ret.Interval(3, 10); // 消費失敗後重試3次,每次間隔10s }); // RateLimit : 限流 cfg.UseRateLimit(1000, TimeSpan.FromMinutes(1)); // 1分鐘以內最多1000次呼叫訪問 // CircuitBreaker : 熔斷 cfg.UseCircuitBreaker(cb => { cb.TrackingPeriod = TimeSpan.FromMinutes(1); cb.TripThreshold = 15; // 當失敗的比例至少達到15%才會啟動熔斷 cb.ActiveThreshold = 10; // 當失敗次數至少達到10次才會啟動熔斷 cb.ResetInterval = TimeSpan.FromMinutes(5); }); // 當在1分鐘內消費失敗率達到15%或呼叫了10次還是失敗時,暫停5分鐘的服務訪問 }); bus.Start(); SendMessage(bus); bus.Stop(); } private static void SendMessage(IBusControl bus) { var mqAddress = new Uri($"rabbitmq://192.168.80.71/EDCVHOST/Qka.MassTransitTestv3"); var client = bus.CreateRequestClient<IRequestMessage, IResponseMessage>(mqAddress, TimeSpan.FromHours(10)); // 建立請求客戶端,10s之內木有回饋則認為是超時(Timeout) do { Console.WriteLine("Press q to exit if you want."); string value = Console.ReadLine(); if (value.ToLower().Equals("q")) { break; } Task.Run(async () => { var request = new RequestMessage() { MessageId = 10001, Content = value, RequestId = 10001 }; var response = await client.Request(request); Console.WriteLine($"Request => MessageId={request.MessageId}, Content={request.Content}"); Console.WriteLine($"Response => MessageCode={response.MessageCode}, Content={response.Content}"); }).Wait(); } while (true); } }
這裡不再解釋,請看註釋。
(4)Receiver 接收端
public class Program { public static void Main(string[] args) { Console.Title = "MassTransit Response Side"; var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst => { hst.Username("admin"); hst.Password("edison"); }); cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv3", e => { e.Consumer<RequestConsumer>(); }); }); bus.Start(); Console.WriteLine("Press any key to exit."); Console.ReadKey(); bus.Stop(); } }
其中,RequestConsumer的定義如下:
public class RequestConsumer : IConsumer<IRequestMessage> { public async Task Consume(ConsumeContext<IRequestMessage> context) { Console.ForegroundColor = ConsoleColor.Red; await Console.Out.WriteLineAsync($"Received message: Id={context.Message.MessageId}, Content={context.Message.Content}"); Console.ResetColor(); var response = new ResponseMessage { MessageCode = 200, Content = $"Success", RequestId = context.Message.MessageId }; Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"Response message: Code={response.MessageCode}, Content={response.Content}, RequestId={response.RequestId}"); Console.ResetColor(); await context.RespondAsync(response); } }View Code
(5)測試一下:
可以看出,請求呼叫方收到了來自接收方返回的狀態訊息,我們可以藉助返回值去check一些狀態。這裡不再演示發生異常從而啟用重試、熔斷等的示例,有興趣的園友可以自行測試。
3.4 Observer模式的釋出/訂閱示例
在某些場景中,我們需要針對一個訊息進行類似於AoP(面向切面程式設計)或者監控的操作,比如在傳送訊息之前和結束後記日誌等操作,我們可以藉助MassTransit中的Observer模式來實現。(在MassTransit的訊息接收中,可以通過兩種模式來實現:一種是基於實現IConsumer介面,另一種就是基於實現IObserver介面)關於這一部分,詳見官方文件:http://masstransit-project.com/MassTransit/usage/observers.html
(1)準備以下圖所示的專案:
(2)Messages:定義要使用的Consumer和Observer
Consumer:
public class TestMessageConsumer : IConsumer<TestMessage> { public async Task Consume(ConsumeContext<TestMessage> context) { Console.ForegroundColor = ConsoleColor.Red; await Console.Out.WriteLineAsync($"TestMessageConsumer => Type:{context.Message.GetType()}, ID:{context.Message.MessageId}, Content:{context.Message.Content}"); Console.ResetColor(); } }View Code
Observer:
public class PublishObserver : IPublishObserver { public Task PrePublish<T>(PublishContext<T> context) where T : class { Console.WriteLine("------------------PrePublish--------------------"); var message = context.Message as TestMessage; Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}"); Console.WriteLine("----------------------------------------------------"); return Task.CompletedTask; } public Task PostPublish<T>(PublishContext<T> context) where T : class { Console.WriteLine("------------------PostPublish--------------------"); var message = context.Message as TestMessage; Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}"); Console.WriteLine("----------------------------------------------------"); return Task.CompletedTask; } public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class { Console.WriteLine("------------------PublishFault--------------------"); var message = context.Message as TestMessage; Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}"); Console.WriteLine("------------------------------------------------------"); return Task.CompletedTask; } } public class SendObserver : ISendObserver { public Task PreSend<T>(SendContext<T> context) where T : class { Console.WriteLine("------------------PreSend--------------------"); var message = context.Message as TestMessage; Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}"); Console.WriteLine("-------------------------------------------------"); return Task.CompletedTask; } public Task PostSend<T>(SendContext<T> context) where T : class { Console.WriteLine("------------------PostSend-------------------"); var message = context.Message as TestMessage; Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}"); Console.WriteLine("-------------------------------------------------"); return Task.CompletedTask; } public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class { Console.WriteLine("------------------SendFault-----------------"); var message = context.Message as TestMessage; Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}"); Console.WriteLine("-------------------------------------------------"); return Task.CompletedTask; } } public class ReceiveObserver : IReceiveObserver { public Task PreReceive(ReceiveContext context) { Console.WriteLine("------------------PreReceive--------------------"); Console.WriteLine(Encoding.Default.GetString(context.GetBody())); Console.WriteLine("--------------------------------------"); return Task.CompletedTask; } public Task PostReceive(ReceiveContext context) { Console.WriteLine("------------------PostReceive--------------------"); Console.WriteLine(Encoding.Default.GetString(context.GetBody())); Console.WriteLine("------------------------------------------------------"); return Task.CompletedTask; } public Task ReceiveFault(ReceiveContext context, Exception exception) { Console.WriteLine("------------------ReceiveFault--------------------"); Console.WriteLine(Encoding.Default.GetString(context.GetBody())); Console.WriteLine("-------------------------------------------------------"); return Task.CompletedTask; } public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class { Console.WriteLine("------------------PostConsume--------------------"); var message = context.Message as TestMessage; Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={mes