如何通過本地化事件正確實現微服務內部強一致性,事件匯流排跨微服務間最終一致性
阿新 • • 發佈:2018-11-12
目錄
- 設計重點
- 流程圖
- 虛擬碼
2.1. PublishEvent
2.2. SubscribeEvent
2.3. Publisher
2.4. Subscriber - 微服務 強一致性
3.1 Publisher
3.2 Subscriber - 事件匯流排 - 跨服務 最終一致性
4.1 Publisher & Subscriber 都開啟了本地事務,保證了強一致性
4.2 問題場景一:當 ③ 釋出失敗怎麼辦?
4.3 問題場景二:當 ③ 釋出成功,但 ④ 更新事件狀態失敗怎麼辦?
4.4 問題場景三:Publisher 端Ok,Subscriber 消費出錯
0. 設計重點
- Publisher 本地化 PublishEvent 保證事件釋出可靠性
- Subscriber 本地化 SubscribeEvent 保證事件訂閱可靠性
- SubscribeEvent 通過 EventId & HandlerType 組合約束 保證不重複消費事件
- 事件中央控制檯 處理 Publisher & Subscriber 事件重試
1. 執行流程圖
2. 虛擬碼
2.1 PublishEvent
public abstract class Event { public Event() { Id = Guid.NewGuid(); CreationTime = DateTime.UtcNow; } public Guid Id { get; set; } public DateTime CreationTime { get; set; } } public class PublishEvent : Event { public PublishEvent(Event @event) { Id = @event.Id; CreationTime = @event.CreationTime; Type = @event.GetType().FullName; Data = JsonConvert.SerializeObject(@event); Status = PublishEventStatus.NotPublished; } public String Type { get; set; } public String Data { get; set; } public PublishEventStatus Status { get; set; } } public enum PublishEventStatus { NotPublished = 0, Published = 1, PublishedFailed = 2 }
2.2 SubscribeEvent
public class SubscribeEvent { public SubscribeEvent(Event @event, IEventHandler handler) { EventId = @event.Id; EventCreationTime = @event.CreationTime; EventType = @event.GetType().FullName; EventData = JsonConvert.SerializeObject(@event); HandlerType = handler.GetType().FullName; HandlingStatus = HandlingStatus.HandleSucceeded; HandlingTime = DateTime.Now; } public Guid EventId { get; set; } public String EventType { get; set; } public String EventData { get; set; } public DateTime EventCreationTime { get; set; } public String HandlerType { get; set; } public DateTime HandlingTime { get; set; } public HandlingStatus HandlingStatus { get; set; } } public enum HandlingStatus { HandleSucceeded = 0, HandleFailed = 1 }
2.3 Publisher
try
{
BeginTransaction(); // ①
//Biz Flow
EventRepository.PubilshEvent(@event);// ②
CommitTransaction();
}
catch(Exception ex){
RollbackTransaction();
throw ex;
}
EventBus.Publish(@event); // ③
EventResitory.EventPublished(@event.ToString()); // ④
2.4 Subscriber
try
{
BeginTransaction();
//Biz Flow
EventRepository.SubscribeEvent(@event , eventHandler); // ⑤
CommitTransaction();
}
catch(Exception ex){
RollbackTransaction();
throw ex;
}
3. 微服務 強一致性
3.1 Publisher
- 開啟本地事務達到強一致性
- 執行本地業務程式碼
- 本地事務內部儲存事件 預釋出 狀態 ②
- 釋出事件到事件匯流排 ③
- 修改事件釋出狀態為已釋出 ④
3.2 Subscriber
- 開啟本地事務達到強一致性
- 執行本地業務程式碼
- 儲存訂閱事件到本地倉庫
4 事件匯流排 - 跨服務 最終一致性
4.1 Publisher & Subscriber 都開啟了本地事務,保證了強一致性
4.2 問題場景一:當 ③ 釋出失敗怎麼辦?
- ③ 釋出失敗,意味著丟擲異常,則 ④ 不執行,那麼事件狀態依然保持 預釋出狀態
- 後續 事件重試 重新發布該事件,並更新事件狀態為 已釋出
4.3 問題場景二:當 ③ 釋出成功,但 ④ 更新事件狀態失敗怎麼辦?
4.3.1 場景二·一 Subscriber 訂閱成功
- ③ 釋出成功,但 ④ 更新事件狀態失敗,事件狀態依然是 預釋出狀態
- Subscriber 訂閱到該事件後成功執行完業務程式碼
- Subscriber 將訂閱事件儲存到本地訂閱事件倉庫 ⑤
該場景存在的問題: Publisher 會通過 事件重試 再次釋出 預釋出 狀態的事件,那麼此時Subscriber 將重複消費該事件
方案:該問題我們可以通過將 SubscribeEvent EventId & HandlerType 組合唯一約束,來避免重複消費
4.3.2 場景二·二 Subscriber 訂閱失敗
- ③ 釋出成功,但 ④ 更新事件狀態失敗,事件狀態依然是 預釋出狀態
- Subscriber 執行消費失敗
- Subscriber 回滾本地事務
該場景不存在任何問題,因為 Publisher 會通過 事件重試 再次釋出 預釋出 狀態的事件 。
4.4 問題場景三:Publisher 端Ok,Subscriber 消費出錯
- Publisher 端處理順利
- Subscriber 消費失敗,回滾本地事務,此時 SubscribeEvent 未儲存到本地倉庫
該場景存在的問題:
Publisher 傳送成功,並且本地 PublishEvent 事件為已釋出,那麼意味著從Publisher端是無法知道Subscriber消費失敗需要重新消費
解決方案: - 通過檢測 PublishEvent & SubscribeEvent 獲得需要 事件重試 的 PublishEvent
- 將 PublishEvent 重新發布 到 Subscriber
5. 通過Nuget安裝元件支援以上程式設計模型
Install-Package SmartEventBus.RabbitMQImpl
Install-Package SmartEventBus.Repository
6. ORM:SmartSql 廣而告之
SmartSql = Dapper + MyBatis + Cache(Memory | Redis) + ZooKeeper + R/W Splitting + ......